Refactor message bus facitlity to avoid confusing with event bus for external 
notification, planning to use it in VMSync


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/85e73d18
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/85e73d18
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/85e73d18

Branch: refs/heads/internallb
Commit: 85e73d18f563a1f114bcd08ab57bf44652d203da
Parents: e7e862d
Author: Kelven Yang <[email protected]>
Authored: Thu Apr 11 15:57:19 2013 -0700
Committer: Kelven Yang <[email protected]>
Committed: Mon Apr 29 14:36:03 2013 -0700

----------------------------------------------------------------------
 client/tomcatconf/applicationContext.xml.in        |    2 +-
 core/src/com/cloud/vm/VMInstanceVO.java            |    1 -
 .../framework/client/ClientEventBus.java           |    4 +-
 .../cloudstack/framework/eventbus/EventBus.java    |   32 --
 .../framework/eventbus/EventBusBase.java           |  308 ---------------
 .../framework/eventbus/EventBusEndpoint.java       |   61 ---
 .../framework/eventbus/EventDispatcher.java        |  104 -----
 .../framework/eventbus/EventHandler.java           |   30 --
 .../framework/eventbus/PublishScope.java           |   24 --
 .../cloudstack/framework/eventbus/Subscriber.java  |   24 --
 .../framework/messagebus/MessageBus.java           |   32 ++
 .../framework/messagebus/MessageBusBase.java       |  308 +++++++++++++++
 .../framework/messagebus/MessageBusEndpoint.java   |   61 +++
 .../framework/messagebus/MessageDispatcher.java    |  104 +++++
 .../framework/messagebus/MessageHandler.java       |   30 ++
 .../framework/messagebus/MessageSubscriber.java    |   24 ++
 .../framework/messagebus/PublishScope.java         |   24 ++
 .../framework/server/ServerEventBus.java           |    4 +-
 .../sampleserver/SampleManagerComponent.java       |   12 +-
 .../sampleserver/SampleManagerComponent2.java      |   12 +-
 .../vmware/manager/VmwareManagerImpl.java          |    3 +-
 21 files changed, 602 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in 
b/client/tomcatconf/applicationContext.xml.in
index 1f5ab20..2e340bf 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -82,7 +82,7 @@
     <property name="messageSerializer" ref="messageSerializer" />
   </bean>
 
-  <bean id="eventBus" class = 
"org.apache.cloudstack.framework.eventbus.EventBusBase" />
+  <bean id="messageBus" class = 
"org.apache.cloudstack.framework.messagebus.MessageBusBase" />
 
   <!--
     DAO with customized configuration

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/core/src/com/cloud/vm/VMInstanceVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/vm/VMInstanceVO.java 
b/core/src/com/cloud/vm/VMInstanceVO.java
index 77e9c02..5ec2712 100644
--- a/core/src/com/cloud/vm/VMInstanceVO.java
+++ b/core/src/com/cloud/vm/VMInstanceVO.java
@@ -153,7 +153,6 @@ public class VMInstanceVO implements VirtualMachine, 
FiniteStateObject<State, Vi
 
     @Column(name="uuid")
     protected String uuid = UUID.randomUUID().toString();
-    ;
 
     @Column(name="disk_offering_id")
     protected Long diskOfferingId;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java 
b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
index 7930bf2..d876b01 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
@@ -18,10 +18,10 @@
  */
 package org.apache.cloudstack.framework.client;
 
-import org.apache.cloudstack.framework.eventbus.EventBusBase;
+import org.apache.cloudstack.framework.messagebus.MessageBusBase;
 import org.apache.cloudstack.framework.transport.TransportMultiplexier;
 
-public class ClientEventBus extends EventBusBase implements 
TransportMultiplexier {
+public class ClientEventBus extends MessageBusBase implements 
TransportMultiplexier {
 
        @Override
        public void onTransportMessage(String senderEndpointAddress,

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
deleted file mode 100644
index 200715c..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.eventbus;
-
-import org.apache.cloudstack.framework.serializer.MessageSerializer;
-
-public interface EventBus {
-       void setMessageSerializer(MessageSerializer messageSerializer);
-       MessageSerializer getMessageSerializer();
-       
-       void subscribe(String subject, Subscriber subscriber);
-       void unsubscribe(String subject, Subscriber subscriber);
-       
-       void publish(String senderAddress, String subject, PublishScope scope, 
Object args);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
deleted file mode 100644
index 30a847f..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.eventbus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cloudstack.framework.serializer.MessageSerializer;
-
-public class EventBusBase implements EventBus {
-
-       private Gate _gate;
-       private List<ActionRecord> _pendingActions;
-       
-       private SubscriptionNode _subscriberRoot;
-       private MessageSerializer _messageSerializer; 
-       
-       public EventBusBase() {
-               _gate = new Gate();
-               _pendingActions = new ArrayList<ActionRecord>();
-               
-               _subscriberRoot = new SubscriptionNode("/", null);
-       }
-       
-       @Override
-       public void setMessageSerializer(MessageSerializer messageSerializer) {
-               _messageSerializer = messageSerializer;
-       }
-       
-       @Override
-       public MessageSerializer getMessageSerializer() {
-               return _messageSerializer;
-       }
-       
-       @Override
-       public void subscribe(String subject, Subscriber subscriber) {
-               assert(subject != null);
-               assert(subscriber != null);
-               if(_gate.enter()) {
-                       SubscriptionNode current = locate(subject, null, true);
-                       assert(current != null);
-                       current.addSubscriber(subscriber);
-                       _gate.leave();
-               } else {
-                       synchronized(_pendingActions) {
-                               _pendingActions.add(new 
ActionRecord(ActionType.Subscribe, subject, subscriber));
-                       }
-               }
-       }
-
-       @Override
-       public void unsubscribe(String subject, Subscriber subscriber) {
-               if(_gate.enter()) {
-                       SubscriptionNode current = locate(subject, null, false);
-                       if(current != null)
-                               current.removeSubscriber(subscriber);
-                       
-                       _gate.leave();
-               } else {
-                       synchronized(_pendingActions) {
-                               _pendingActions.add(new 
ActionRecord(ActionType.Unsubscribe, subject, subscriber));
-                       }
-               }
-       }
-
-       @Override
-       public void publish(String senderAddress, String subject, PublishScope 
scope, 
-               Object args) {
-               
-               if(_gate.enter(true)) {
-
-                       List<SubscriptionNode> chainFromTop = new 
ArrayList<SubscriptionNode>();
-                       SubscriptionNode current = locate(subject, 
chainFromTop, false);
-                       
-                       if(current != null)
-                               current.notifySubscribers(senderAddress, 
subject, args);
-                       
-                       Collections.reverse(chainFromTop);
-                       for(SubscriptionNode node : chainFromTop)
-                               node.notifySubscribers(senderAddress, subject, 
args);
-                       
-                       _gate.leave();
-               }
-       }
-       
-       private void onGateOpen() {
-               synchronized(_pendingActions) {
-                       ActionRecord record = null;
-                       if(_pendingActions.size() > 0) {
-                               while((record = _pendingActions.remove(0)) != 
null) {
-                                       switch(record.getType()) {
-                                       case Subscribe :
-                                               {
-                                                       SubscriptionNode 
current = locate(record.getSubject(), null, true);
-                                                       assert(current != null);
-                                                       
current.addSubscriber(record.getSubscriber());
-                                               }
-                                               break;
-                                               
-                                       case Unsubscribe :
-                                               {
-                                                       SubscriptionNode 
current = locate(record.getSubject(), null, false);
-                                                       if(current != null)
-                                                               
current.removeSubscriber(record.getSubscriber());
-                                               }
-                                               break;
-                                               
-                                       default :
-                                               assert(false);
-                                               break;
-                                       
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       
-       private SubscriptionNode locate(String subject, List<SubscriptionNode> 
chainFromTop,
-               boolean createPath) {
-               
-               assert(subject != null);
-               
-               String[] subjectPathTokens = subject.split("\\.");
-               return locate(subjectPathTokens, _subscriberRoot, chainFromTop, 
createPath);
-       }
-       
-       private static SubscriptionNode locate(String[] subjectPathTokens, 
-               SubscriptionNode current, List<SubscriptionNode> chainFromTop, 
boolean createPath) {
-               
-               assert(current != null);
-               assert(subjectPathTokens != null);
-               assert(subjectPathTokens.length > 0);
-
-               if(chainFromTop != null)
-                       chainFromTop.add(current);
-               
-               SubscriptionNode next = current.getChild(subjectPathTokens[0]);
-               if(next == null) {
-                       if(createPath) {
-                               next = new 
SubscriptionNode(subjectPathTokens[0], null);
-                               current.addChild(subjectPathTokens[0], next);
-                       } else {
-                               return null;
-                       }
-               }
-               
-               if(subjectPathTokens.length > 1) {
-                       return 
locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, 
subjectPathTokens.length),
-                               next, chainFromTop, createPath);
-               } else {
-                       return next;
-               }
-       }
-       
-       
-       //
-       // Support inner classes
-       //
-       private static enum ActionType {
-               Subscribe,
-               Unsubscribe
-       }
-       
-       private static class ActionRecord {
-               private ActionType _type;
-               private String _subject;
-               private Subscriber _subscriber;
-               
-               public ActionRecord(ActionType type, String subject, Subscriber 
subscriber) {
-                       _type = type;
-                       _subject = subject;
-                       _subscriber = subscriber;
-               }
-               
-               public ActionType getType() { 
-                       return _type; 
-               }
-               
-               public String getSubject() {
-                       return _subject;
-               }
-               
-               public Subscriber getSubscriber() {
-                       return _subscriber;
-               }
-       }
-       
-       private class Gate {
-               private int _reentranceCount;
-               private Thread _gateOwner;
-               
-               public Gate() {
-                       _reentranceCount = 0;
-                       _gateOwner = null;
-               }
-               
-               public boolean enter() {
-                       return enter(false);
-               }
-               
-               public boolean enter(boolean wait) {
-                       while(true) {
-                               synchronized(this) {
-                                       if(_reentranceCount == 0) {
-                                               assert(_gateOwner == null);
-                                               
-                                               _reentranceCount++;
-                                               _gateOwner = 
Thread.currentThread();
-                                               return true;
-                                       } else {
-                                               if(wait) {
-                                                       try {
-                                                               wait();
-                                                       } catch 
(InterruptedException e) {
-                                                       }
-                                               } else {
-                                                       break;
-                                               }
-                                       }
-                               }
-                       }
-                       
-                       return false;
-               }
-               
-               public void leave() {
-                       synchronized(this) {
-                               if(_reentranceCount > 0) {
-                                       assert(_gateOwner == 
Thread.currentThread());
-                                       
-                                       onGateOpen();
-                                       _reentranceCount--;
-                                       assert(_reentranceCount == 0);
-                                       _gateOwner = null;
-                                       
-                                       notifyAll();
-                               }
-                       }
-               }
-       }
-       
-       private static class SubscriptionNode {
-               @SuppressWarnings("unused")
-               private String _nodeKey;
-               private List<Subscriber> _subscribers;
-               private Map<String, SubscriptionNode> _children;
-               
-               public SubscriptionNode(String nodeKey, Subscriber subscriber) {
-                       assert(nodeKey != null);
-                       _nodeKey = nodeKey;
-                       _subscribers = new ArrayList<Subscriber>();
-                       
-                       if(subscriber != null)
-                               _subscribers.add(subscriber);
-                       
-                       _children = new HashMap<String, SubscriptionNode>();
-               }
-               
-               @SuppressWarnings("unused")
-               public List<Subscriber> getSubscriber() {
-                       return _subscribers;
-               }
-               
-               public void addSubscriber(Subscriber subscriber) {
-                       _subscribers.add(subscriber);
-               }
-               
-               public void removeSubscriber(Subscriber subscriber) {
-                       _subscribers.remove(subscriber);
-               }
-               
-               public SubscriptionNode getChild(String key) {
-                       return _children.get(key);
-               }
-               
-               public void addChild(String key, SubscriptionNode childNode) {
-                       _children.put(key, childNode);
-               }
-               
-               public void notifySubscribers(String senderAddress, String 
subject,  Object args) {
-                       for(Subscriber subscriber : _subscribers) {
-                               subscriber.onPublishEvent(senderAddress, 
subject, args);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
deleted file mode 100644
index 19a9b03..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.eventbus;
-
-
-public class EventBusEndpoint {
-       private EventBus _eventBus;
-       private String _sender;
-       private PublishScope _scope;
-       
-       public EventBusEndpoint(EventBus eventBus, String sender, PublishScope 
scope) {
-               _eventBus = eventBus;
-               _sender = sender;
-               _scope = scope;
-       }
-       
-       public EventBusEndpoint setEventBus(EventBus eventBus) {
-               _eventBus = eventBus;
-               return this;
-       }
-       
-       public EventBusEndpoint setScope(PublishScope scope) {
-               _scope = scope;
-               return this;
-       }
-       
-       public PublishScope getScope() {
-               return _scope;
-       }
-       
-       public EventBusEndpoint setSender(String sender) {
-               _sender = sender;
-               return this;
-       }
-       
-       public String getSender() {
-               return _sender;
-       }
-       
-       public void Publish(String subject, Object args) {
-               assert(_eventBus != null);
-               _eventBus.publish(_sender, subject, _scope, args);
-       }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
deleted file mode 100644
index 336a994..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.eventbus;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class EventDispatcher implements Subscriber {
-       private static Map<Class<?>, Method> s_handlerCache = new 
HashMap<Class<?>, Method>();
-       
-       private static Map<Object, EventDispatcher> s_targetMap = new 
HashMap<Object, EventDispatcher>();
-       private Object _targetObject;
-       
-       public EventDispatcher(Object targetObject) {
-               _targetObject = targetObject;
-       }
-       
-       @Override
-       public void onPublishEvent(String senderAddress, String subject, Object 
args) {
-               dispatch(_targetObject, subject, senderAddress, args);
-       }
-       
-       public static EventDispatcher getDispatcher(Object targetObject) {
-               EventDispatcher dispatcher;
-               synchronized(s_targetMap) {
-                       dispatcher = s_targetMap.get(targetObject);
-                       if(dispatcher == null) {
-                               dispatcher = new EventDispatcher(targetObject);
-                               s_targetMap.put(targetObject, dispatcher);
-                       }
-               }
-               return dispatcher;
-       }
-       
-       public static void removeDispatcher(Object targetObject) {
-               synchronized(s_targetMap) {
-                       s_targetMap.remove(targetObject);
-               }
-       }
-       
-       public static boolean dispatch(Object target, String subject, String 
senderAddress, Object args) {
-               assert(subject != null);
-               assert(target != null);
-               
-               Method handler = resolveHandler(target.getClass(), subject);
-               if(handler == null)
-                       return false;
-               
-               try {
-                       handler.invoke(target, subject, senderAddress, args);
-               } catch (IllegalArgumentException e) {
-                       throw new RuntimeException("IllegalArgumentException 
when invoking event handler for subject: " + subject);
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("IllegalAccessException when 
invoking event handler for subject: " + subject);
-               } catch (InvocationTargetException e) {
-                       throw new RuntimeException("InvocationTargetException 
when invoking event handler for subject: " + subject);
-               }
-               
-               return true;
-       }
-       
-       public static Method resolveHandler(Class<?> handlerClz, String 
subject) {
-               synchronized(s_handlerCache) {
-                       Method handler = s_handlerCache.get(handlerClz);
-                       if(handler != null)
-                               return handler;
-                       
-                       for(Method method : handlerClz.getMethods()) {
-                               EventHandler annotation = 
method.getAnnotation(EventHandler.class);
-                               if(annotation != null) {
-                                       if(match(annotation.topic(), subject)) {
-                                               s_handlerCache.put(handlerClz, 
method);
-                                               return method;
-                                       }
-                               }
-                       }
-               }
-               
-               return null;
-       }
-       
-       private static boolean match(String expression, String param) {
-               return param.matches(expression);
-       }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
deleted file mode 100644
index 1ed3a00..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.eventbus;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface EventHandler {
-       public String topic();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
deleted file mode 100644
index 539a242..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.eventbus;
-
-public enum PublishScope {
-       LOCAL, GLOBAL 
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java 
b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
deleted file mode 100644
index 28b86de..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.eventbus;
-
-public interface Subscriber {
-       void onPublishEvent(String senderAddress, String subject, Object args);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
new file mode 100644
index 0000000..4aa007d
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messagebus;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+
+public interface MessageBus {
+       void setMessageSerializer(MessageSerializer messageSerializer);
+       MessageSerializer getMessageSerializer();
+       
+       void subscribe(String subject, MessageSubscriber subscriber);
+       void unsubscribe(String subject, MessageSubscriber subscriber);
+       
+       void publish(String senderAddress, String subject, PublishScope scope, 
Object args);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
new file mode 100644
index 0000000..5b7af4d
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messagebus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+
+public class MessageBusBase implements MessageBus {
+
+       private Gate _gate;
+       private List<ActionRecord> _pendingActions;
+       
+       private SubscriptionNode _subscriberRoot;
+       private MessageSerializer _messageSerializer; 
+       
+       public MessageBusBase() {
+               _gate = new Gate();
+               _pendingActions = new ArrayList<ActionRecord>();
+               
+               _subscriberRoot = new SubscriptionNode("/", null);
+       }
+       
+       @Override
+       public void setMessageSerializer(MessageSerializer messageSerializer) {
+               _messageSerializer = messageSerializer;
+       }
+       
+       @Override
+       public MessageSerializer getMessageSerializer() {
+               return _messageSerializer;
+       }
+       
+       @Override
+       public void subscribe(String subject, MessageSubscriber subscriber) {
+               assert(subject != null);
+               assert(subscriber != null);
+               if(_gate.enter()) {
+                       SubscriptionNode current = locate(subject, null, true);
+                       assert(current != null);
+                       current.addSubscriber(subscriber);
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.Subscribe, subject, subscriber));
+                       }
+               }
+       }
+
+       @Override
+       public void unsubscribe(String subject, MessageSubscriber subscriber) {
+               if(_gate.enter()) {
+                       SubscriptionNode current = locate(subject, null, false);
+                       if(current != null)
+                               current.removeSubscriber(subscriber);
+                       
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.Unsubscribe, subject, subscriber));
+                       }
+               }
+       }
+
+       @Override
+       public void publish(String senderAddress, String subject, PublishScope 
scope, 
+               Object args) {
+               
+               if(_gate.enter(true)) {
+
+                       List<SubscriptionNode> chainFromTop = new 
ArrayList<SubscriptionNode>();
+                       SubscriptionNode current = locate(subject, 
chainFromTop, false);
+                       
+                       if(current != null)
+                               current.notifySubscribers(senderAddress, 
subject, args);
+                       
+                       Collections.reverse(chainFromTop);
+                       for(SubscriptionNode node : chainFromTop)
+                               node.notifySubscribers(senderAddress, subject, 
args);
+                       
+                       _gate.leave();
+               }
+       }
+       
+       private void onGateOpen() {
+               synchronized(_pendingActions) {
+                       ActionRecord record = null;
+                       if(_pendingActions.size() > 0) {
+                               while((record = _pendingActions.remove(0)) != 
null) {
+                                       switch(record.getType()) {
+                                       case Subscribe :
+                                               {
+                                                       SubscriptionNode 
current = locate(record.getSubject(), null, true);
+                                                       assert(current != null);
+                                                       
current.addSubscriber(record.getSubscriber());
+                                               }
+                                               break;
+                                               
+                                       case Unsubscribe :
+                                               {
+                                                       SubscriptionNode 
current = locate(record.getSubject(), null, false);
+                                                       if(current != null)
+                                                               
current.removeSubscriber(record.getSubscriber());
+                                               }
+                                               break;
+                                               
+                                       default :
+                                               assert(false);
+                                               break;
+                                       
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       
+       private SubscriptionNode locate(String subject, List<SubscriptionNode> 
chainFromTop,
+               boolean createPath) {
+               
+               assert(subject != null);
+               
+               String[] subjectPathTokens = subject.split("\\.");
+               return locate(subjectPathTokens, _subscriberRoot, chainFromTop, 
createPath);
+       }
+       
+       private static SubscriptionNode locate(String[] subjectPathTokens, 
+               SubscriptionNode current, List<SubscriptionNode> chainFromTop, 
boolean createPath) {
+               
+               assert(current != null);
+               assert(subjectPathTokens != null);
+               assert(subjectPathTokens.length > 0);
+
+               if(chainFromTop != null)
+                       chainFromTop.add(current);
+               
+               SubscriptionNode next = current.getChild(subjectPathTokens[0]);
+               if(next == null) {
+                       if(createPath) {
+                               next = new 
SubscriptionNode(subjectPathTokens[0], null);
+                               current.addChild(subjectPathTokens[0], next);
+                       } else {
+                               return null;
+                       }
+               }
+               
+               if(subjectPathTokens.length > 1) {
+                       return 
locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, 
subjectPathTokens.length),
+                               next, chainFromTop, createPath);
+               } else {
+                       return next;
+               }
+       }
+       
+       
+       //
+       // Support inner classes
+       //
+       private static enum ActionType {
+               Subscribe,
+               Unsubscribe
+       }
+       
+       private static class ActionRecord {
+               private ActionType _type;
+               private String _subject;
+               private MessageSubscriber _subscriber;
+               
+               public ActionRecord(ActionType type, String subject, 
MessageSubscriber subscriber) {
+                       _type = type;
+                       _subject = subject;
+                       _subscriber = subscriber;
+               }
+               
+               public ActionType getType() { 
+                       return _type; 
+               }
+               
+               public String getSubject() {
+                       return _subject;
+               }
+               
+               public MessageSubscriber getSubscriber() {
+                       return _subscriber;
+               }
+       }
+       
+       private class Gate {
+               private int _reentranceCount;
+               private Thread _gateOwner;
+               
+               public Gate() {
+                       _reentranceCount = 0;
+                       _gateOwner = null;
+               }
+               
+               public boolean enter() {
+                       return enter(false);
+               }
+               
+               public boolean enter(boolean wait) {
+                       while(true) {
+                               synchronized(this) {
+                                       if(_reentranceCount == 0) {
+                                               assert(_gateOwner == null);
+                                               
+                                               _reentranceCount++;
+                                               _gateOwner = 
Thread.currentThread();
+                                               return true;
+                                       } else {
+                                               if(wait) {
+                                                       try {
+                                                               wait();
+                                                       } catch 
(InterruptedException e) {
+                                                       }
+                                               } else {
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+                       
+                       return false;
+               }
+               
+               public void leave() {
+                       synchronized(this) {
+                               if(_reentranceCount > 0) {
+                                       assert(_gateOwner == 
Thread.currentThread());
+                                       
+                                       onGateOpen();
+                                       _reentranceCount--;
+                                       assert(_reentranceCount == 0);
+                                       _gateOwner = null;
+                                       
+                                       notifyAll();
+                               }
+                       }
+               }
+       }
+       
+       private static class SubscriptionNode {
+               @SuppressWarnings("unused")
+               private String _nodeKey;
+               private List<MessageSubscriber> _subscribers;
+               private Map<String, SubscriptionNode> _children;
+               
+               public SubscriptionNode(String nodeKey, MessageSubscriber 
subscriber) {
+                       assert(nodeKey != null);
+                       _nodeKey = nodeKey;
+                       _subscribers = new ArrayList<MessageSubscriber>();
+                       
+                       if(subscriber != null)
+                               _subscribers.add(subscriber);
+                       
+                       _children = new HashMap<String, SubscriptionNode>();
+               }
+               
+               @SuppressWarnings("unused")
+               public List<MessageSubscriber> getSubscriber() {
+                       return _subscribers;
+               }
+               
+               public void addSubscriber(MessageSubscriber subscriber) {
+                       _subscribers.add(subscriber);
+               }
+               
+               public void removeSubscriber(MessageSubscriber subscriber) {
+                       _subscribers.remove(subscriber);
+               }
+               
+               public SubscriptionNode getChild(String key) {
+                       return _children.get(key);
+               }
+               
+               public void addChild(String key, SubscriptionNode childNode) {
+                       _children.put(key, childNode);
+               }
+               
+               public void notifySubscribers(String senderAddress, String 
subject,  Object args) {
+                       for(MessageSubscriber subscriber : _subscribers) {
+                               subscriber.onPublishMessage(senderAddress, 
subject, args);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java
new file mode 100644
index 0000000..0824e13
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messagebus;
+
+
+public class MessageBusEndpoint {
+       private MessageBus _eventBus;
+       private String _sender;
+       private PublishScope _scope;
+       
+       public MessageBusEndpoint(MessageBus eventBus, String sender, 
PublishScope scope) {
+               _eventBus = eventBus;
+               _sender = sender;
+               _scope = scope;
+       }
+       
+       public MessageBusEndpoint setEventBus(MessageBus eventBus) {
+               _eventBus = eventBus;
+               return this;
+       }
+       
+       public MessageBusEndpoint setScope(PublishScope scope) {
+               _scope = scope;
+               return this;
+       }
+       
+       public PublishScope getScope() {
+               return _scope;
+       }
+       
+       public MessageBusEndpoint setSender(String sender) {
+               _sender = sender;
+               return this;
+       }
+       
+       public String getSender() {
+               return _sender;
+       }
+       
+       public void Publish(String subject, Object args) {
+               assert(_eventBus != null);
+               _eventBus.publish(_sender, subject, _scope, args);
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
new file mode 100644
index 0000000..ac75afb
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messagebus;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class MessageDispatcher implements MessageSubscriber {
+       private static Map<Class<?>, Method> s_handlerCache = new 
HashMap<Class<?>, Method>();
+       
+       private static Map<Object, MessageDispatcher> s_targetMap = new 
HashMap<Object, MessageDispatcher>();
+       private Object _targetObject;
+       
+       public MessageDispatcher(Object targetObject) {
+               _targetObject = targetObject;
+       }
+       
+       @Override
+       public void onPublishMessage(String senderAddress, String subject, 
Object args) {
+               dispatch(_targetObject, subject, senderAddress, args);
+       }
+       
+       public static MessageDispatcher getDispatcher(Object targetObject) {
+               MessageDispatcher dispatcher;
+               synchronized(s_targetMap) {
+                       dispatcher = s_targetMap.get(targetObject);
+                       if(dispatcher == null) {
+                               dispatcher = new 
MessageDispatcher(targetObject);
+                               s_targetMap.put(targetObject, dispatcher);
+                       }
+               }
+               return dispatcher;
+       }
+       
+       public static void removeDispatcher(Object targetObject) {
+               synchronized(s_targetMap) {
+                       s_targetMap.remove(targetObject);
+               }
+       }
+       
+       public static boolean dispatch(Object target, String subject, String 
senderAddress, Object args) {
+               assert(subject != null);
+               assert(target != null);
+               
+               Method handler = resolveHandler(target.getClass(), subject);
+               if(handler == null)
+                       return false;
+               
+               try {
+                       handler.invoke(target, subject, senderAddress, args);
+               } catch (IllegalArgumentException e) {
+                       throw new RuntimeException("IllegalArgumentException 
when invoking event handler for subject: " + subject);
+               } catch (IllegalAccessException e) {
+                       throw new RuntimeException("IllegalAccessException when 
invoking event handler for subject: " + subject);
+               } catch (InvocationTargetException e) {
+                       throw new RuntimeException("InvocationTargetException 
when invoking event handler for subject: " + subject);
+               }
+               
+               return true;
+       }
+       
+       public static Method resolveHandler(Class<?> handlerClz, String 
subject) {
+               synchronized(s_handlerCache) {
+                       Method handler = s_handlerCache.get(handlerClz);
+                       if(handler != null)
+                               return handler;
+                       
+                       for(Method method : handlerClz.getMethods()) {
+                               MessageHandler annotation = 
method.getAnnotation(MessageHandler.class);
+                               if(annotation != null) {
+                                       if(match(annotation.topic(), subject)) {
+                                               s_handlerCache.put(handlerClz, 
method);
+                                               return method;
+                                       }
+                               }
+                       }
+               }
+               
+               return null;
+       }
+       
+       private static boolean match(String expression, String param) {
+               return param.matches(expression);
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java
new file mode 100644
index 0000000..d9f51df
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messagebus;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface MessageHandler {
+       public String topic();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java
new file mode 100644
index 0000000..072f98d
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messagebus;
+
+public interface MessageSubscriber {
+       void onPublishMessage(String senderAddress, String subject, Object 
args);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java
new file mode 100644
index 0000000..2b3d8ac
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messagebus;
+
+public enum PublishScope {
+       LOCAL, GLOBAL 
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java 
b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
index 11bc428..f3b782d 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
@@ -18,10 +18,10 @@
  */
 package org.apache.cloudstack.framework.server;
 
-import org.apache.cloudstack.framework.eventbus.EventBusBase;
+import org.apache.cloudstack.framework.messagebus.MessageBusBase;
 import org.apache.cloudstack.framework.transport.TransportMultiplexier;
 
-public class ServerEventBus extends EventBusBase implements 
TransportMultiplexier {
+public class ServerEventBus extends MessageBusBase implements 
TransportMultiplexier {
 
        @Override
        public void onTransportMessage(String senderEndpointAddress,

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java
 
b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java
index 7b0a2ec..d59fe42 100644
--- 
a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java
@@ -24,9 +24,9 @@ import java.util.TimerTask;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
-import org.apache.cloudstack.framework.eventbus.EventBus;
-import org.apache.cloudstack.framework.eventbus.EventDispatcher;
-import org.apache.cloudstack.framework.eventbus.EventHandler;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
 import org.apache.cloudstack.framework.rpc.RpcCallbackListener;
 import org.apache.cloudstack.framework.rpc.RpcException;
 import org.apache.cloudstack.framework.rpc.RpcProvider;
@@ -41,7 +41,7 @@ public class SampleManagerComponent {
     private static final Logger s_logger = 
Logger.getLogger(SampleManagerComponent.class);
        
        @Inject
-       private EventBus _eventBus;
+       private MessageBus _eventBus;
        
        @Inject
        private RpcProvider _rpcProvider;
@@ -58,7 +58,7 @@ public class SampleManagerComponent {
                        
                // subscribe to all network events (for example)
                _eventBus.subscribe("network", 
-                       EventDispatcher.getDispatcher(this));
+                       MessageDispatcher.getDispatcher(this));
                
                _timer.schedule(new TimerTask() {
                                public void run() {
@@ -72,7 +72,7 @@ public class SampleManagerComponent {
                call.completeCall("NetworkPrepare completed");
        }
        
-       @EventHandler(topic="network.prepare")
+       @MessageHandler(topic="network.prepare")
        void onPrepareNetwork(String sender, String topic, Object args) {
        }
        

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java
 
b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java
index dc482c0..448c37b 100644
--- 
a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java
@@ -21,9 +21,9 @@ package org.apache.cloudstack.framework.sampleserver;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
-import org.apache.cloudstack.framework.eventbus.EventBus;
-import org.apache.cloudstack.framework.eventbus.EventDispatcher;
-import org.apache.cloudstack.framework.eventbus.EventHandler;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
 import org.apache.cloudstack.framework.rpc.RpcProvider;
 import org.apache.cloudstack.framework.rpc.RpcServerCall;
 import org.apache.cloudstack.framework.rpc.RpcServiceDispatcher;
@@ -36,7 +36,7 @@ public class SampleManagerComponent2 {
     private static final Logger s_logger = 
Logger.getLogger(SampleManagerComponent2.class);
        
        @Inject
-       private EventBus _eventBus;
+       private MessageBus _eventBus;
 
        @Inject
        private RpcProvider _rpcProvider;
@@ -51,7 +51,7 @@ public class SampleManagerComponent2 {
                        
                // subscribe to all network events (for example)
                _eventBus.subscribe("storage", 
-                       EventDispatcher.getDispatcher(this));
+                       MessageDispatcher.getDispatcher(this));
        }
        
        @RpcServiceHandler(command="StoragePrepare")
@@ -66,7 +66,7 @@ public class SampleManagerComponent2 {
                call.completeCall(answer);
        }
        
-       @EventHandler(topic="storage.prepare")
+       @MessageHandler(topic="storage.prepare")
        void onPrepareNetwork(String sender, String topic, Object args) {
        }
        

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java
 
b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java
index eb09af0..9f260f1 100755
--- 
a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java
+++ 
b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java
@@ -494,7 +494,8 @@ public class VmwareManagerImpl extends ManagerBase 
implements VmwareManager, Vmw
                         s_logger.info("Inject SSH key pairs before copying 
systemvm.iso into secondary storage");
                         _configServer.updateKeyPairs();
 
-
+                        s_logger.info("Copy System VM patch ISO file to 
secondary storage. source ISO: " + srcIso.getAbsolutePath() +
+                               ", destination: " + destIso.getAbsolutePath());
                         try {
                             FileUtil.copyfile(srcIso, destIso);
                         } catch(IOException e) {

Reply via email to