Refactor message bus facitlity to avoid confusing with event bus for external notification, planning to use it in VMSync
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/85e73d18 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/85e73d18 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/85e73d18 Branch: refs/heads/internallb Commit: 85e73d18f563a1f114bcd08ab57bf44652d203da Parents: e7e862d Author: Kelven Yang <[email protected]> Authored: Thu Apr 11 15:57:19 2013 -0700 Committer: Kelven Yang <[email protected]> Committed: Mon Apr 29 14:36:03 2013 -0700 ---------------------------------------------------------------------- client/tomcatconf/applicationContext.xml.in | 2 +- core/src/com/cloud/vm/VMInstanceVO.java | 1 - .../framework/client/ClientEventBus.java | 4 +- .../cloudstack/framework/eventbus/EventBus.java | 32 -- .../framework/eventbus/EventBusBase.java | 308 --------------- .../framework/eventbus/EventBusEndpoint.java | 61 --- .../framework/eventbus/EventDispatcher.java | 104 ----- .../framework/eventbus/EventHandler.java | 30 -- .../framework/eventbus/PublishScope.java | 24 -- .../cloudstack/framework/eventbus/Subscriber.java | 24 -- .../framework/messagebus/MessageBus.java | 32 ++ .../framework/messagebus/MessageBusBase.java | 308 +++++++++++++++ .../framework/messagebus/MessageBusEndpoint.java | 61 +++ .../framework/messagebus/MessageDispatcher.java | 104 +++++ .../framework/messagebus/MessageHandler.java | 30 ++ .../framework/messagebus/MessageSubscriber.java | 24 ++ .../framework/messagebus/PublishScope.java | 24 ++ .../framework/server/ServerEventBus.java | 4 +- .../sampleserver/SampleManagerComponent.java | 12 +- .../sampleserver/SampleManagerComponent2.java | 12 +- .../vmware/manager/VmwareManagerImpl.java | 3 +- 21 files changed, 602 insertions(+), 602 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/client/tomcatconf/applicationContext.xml.in ---------------------------------------------------------------------- diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index 1f5ab20..2e340bf 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -82,7 +82,7 @@ <property name="messageSerializer" ref="messageSerializer" /> </bean> - <bean id="eventBus" class = "org.apache.cloudstack.framework.eventbus.EventBusBase" /> + <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" /> <!-- DAO with customized configuration http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/core/src/com/cloud/vm/VMInstanceVO.java ---------------------------------------------------------------------- diff --git a/core/src/com/cloud/vm/VMInstanceVO.java b/core/src/com/cloud/vm/VMInstanceVO.java index 77e9c02..5ec2712 100644 --- a/core/src/com/cloud/vm/VMInstanceVO.java +++ b/core/src/com/cloud/vm/VMInstanceVO.java @@ -153,7 +153,6 @@ public class VMInstanceVO implements VirtualMachine, FiniteStateObject<State, Vi @Column(name="uuid") protected String uuid = UUID.randomUUID().toString(); - ; @Column(name="disk_offering_id") protected Long diskOfferingId; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java index 7930bf2..d876b01 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java @@ -18,10 +18,10 @@ */ package org.apache.cloudstack.framework.client; -import org.apache.cloudstack.framework.eventbus.EventBusBase; +import org.apache.cloudstack.framework.messagebus.MessageBusBase; import org.apache.cloudstack.framework.transport.TransportMultiplexier; -public class ClientEventBus extends EventBusBase implements TransportMultiplexier { +public class ClientEventBus extends MessageBusBase implements TransportMultiplexier { @Override public void onTransportMessage(String senderEndpointAddress, http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java deleted file mode 100644 index 200715c..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cloudstack.framework.eventbus; - -import org.apache.cloudstack.framework.serializer.MessageSerializer; - -public interface EventBus { - void setMessageSerializer(MessageSerializer messageSerializer); - MessageSerializer getMessageSerializer(); - - void subscribe(String subject, Subscriber subscriber); - void unsubscribe(String subject, Subscriber subscriber); - - void publish(String senderAddress, String subject, PublishScope scope, Object args); -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java deleted file mode 100644 index 30a847f..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cloudstack.framework.eventbus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.cloudstack.framework.serializer.MessageSerializer; - -public class EventBusBase implements EventBus { - - private Gate _gate; - private List<ActionRecord> _pendingActions; - - private SubscriptionNode _subscriberRoot; - private MessageSerializer _messageSerializer; - - public EventBusBase() { - _gate = new Gate(); - _pendingActions = new ArrayList<ActionRecord>(); - - _subscriberRoot = new SubscriptionNode("/", null); - } - - @Override - public void setMessageSerializer(MessageSerializer messageSerializer) { - _messageSerializer = messageSerializer; - } - - @Override - public MessageSerializer getMessageSerializer() { - return _messageSerializer; - } - - @Override - public void subscribe(String subject, Subscriber subscriber) { - assert(subject != null); - assert(subscriber != null); - if(_gate.enter()) { - SubscriptionNode current = locate(subject, null, true); - assert(current != null); - current.addSubscriber(subscriber); - _gate.leave(); - } else { - synchronized(_pendingActions) { - _pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber)); - } - } - } - - @Override - public void unsubscribe(String subject, Subscriber subscriber) { - if(_gate.enter()) { - SubscriptionNode current = locate(subject, null, false); - if(current != null) - current.removeSubscriber(subscriber); - - _gate.leave(); - } else { - synchronized(_pendingActions) { - _pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber)); - } - } - } - - @Override - public void publish(String senderAddress, String subject, PublishScope scope, - Object args) { - - if(_gate.enter(true)) { - - List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>(); - SubscriptionNode current = locate(subject, chainFromTop, false); - - if(current != null) - current.notifySubscribers(senderAddress, subject, args); - - Collections.reverse(chainFromTop); - for(SubscriptionNode node : chainFromTop) - node.notifySubscribers(senderAddress, subject, args); - - _gate.leave(); - } - } - - private void onGateOpen() { - synchronized(_pendingActions) { - ActionRecord record = null; - if(_pendingActions.size() > 0) { - while((record = _pendingActions.remove(0)) != null) { - switch(record.getType()) { - case Subscribe : - { - SubscriptionNode current = locate(record.getSubject(), null, true); - assert(current != null); - current.addSubscriber(record.getSubscriber()); - } - break; - - case Unsubscribe : - { - SubscriptionNode current = locate(record.getSubject(), null, false); - if(current != null) - current.removeSubscriber(record.getSubscriber()); - } - break; - - default : - assert(false); - break; - - } - } - } - } - } - - - private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop, - boolean createPath) { - - assert(subject != null); - - String[] subjectPathTokens = subject.split("\\."); - return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath); - } - - private static SubscriptionNode locate(String[] subjectPathTokens, - SubscriptionNode current, List<SubscriptionNode> chainFromTop, boolean createPath) { - - assert(current != null); - assert(subjectPathTokens != null); - assert(subjectPathTokens.length > 0); - - if(chainFromTop != null) - chainFromTop.add(current); - - SubscriptionNode next = current.getChild(subjectPathTokens[0]); - if(next == null) { - if(createPath) { - next = new SubscriptionNode(subjectPathTokens[0], null); - current.addChild(subjectPathTokens[0], next); - } else { - return null; - } - } - - if(subjectPathTokens.length > 1) { - return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length), - next, chainFromTop, createPath); - } else { - return next; - } - } - - - // - // Support inner classes - // - private static enum ActionType { - Subscribe, - Unsubscribe - } - - private static class ActionRecord { - private ActionType _type; - private String _subject; - private Subscriber _subscriber; - - public ActionRecord(ActionType type, String subject, Subscriber subscriber) { - _type = type; - _subject = subject; - _subscriber = subscriber; - } - - public ActionType getType() { - return _type; - } - - public String getSubject() { - return _subject; - } - - public Subscriber getSubscriber() { - return _subscriber; - } - } - - private class Gate { - private int _reentranceCount; - private Thread _gateOwner; - - public Gate() { - _reentranceCount = 0; - _gateOwner = null; - } - - public boolean enter() { - return enter(false); - } - - public boolean enter(boolean wait) { - while(true) { - synchronized(this) { - if(_reentranceCount == 0) { - assert(_gateOwner == null); - - _reentranceCount++; - _gateOwner = Thread.currentThread(); - return true; - } else { - if(wait) { - try { - wait(); - } catch (InterruptedException e) { - } - } else { - break; - } - } - } - } - - return false; - } - - public void leave() { - synchronized(this) { - if(_reentranceCount > 0) { - assert(_gateOwner == Thread.currentThread()); - - onGateOpen(); - _reentranceCount--; - assert(_reentranceCount == 0); - _gateOwner = null; - - notifyAll(); - } - } - } - } - - private static class SubscriptionNode { - @SuppressWarnings("unused") - private String _nodeKey; - private List<Subscriber> _subscribers; - private Map<String, SubscriptionNode> _children; - - public SubscriptionNode(String nodeKey, Subscriber subscriber) { - assert(nodeKey != null); - _nodeKey = nodeKey; - _subscribers = new ArrayList<Subscriber>(); - - if(subscriber != null) - _subscribers.add(subscriber); - - _children = new HashMap<String, SubscriptionNode>(); - } - - @SuppressWarnings("unused") - public List<Subscriber> getSubscriber() { - return _subscribers; - } - - public void addSubscriber(Subscriber subscriber) { - _subscribers.add(subscriber); - } - - public void removeSubscriber(Subscriber subscriber) { - _subscribers.remove(subscriber); - } - - public SubscriptionNode getChild(String key) { - return _children.get(key); - } - - public void addChild(String key, SubscriptionNode childNode) { - _children.put(key, childNode); - } - - public void notifySubscribers(String senderAddress, String subject, Object args) { - for(Subscriber subscriber : _subscribers) { - subscriber.onPublishEvent(senderAddress, subject, args); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java deleted file mode 100644 index 19a9b03..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cloudstack.framework.eventbus; - - -public class EventBusEndpoint { - private EventBus _eventBus; - private String _sender; - private PublishScope _scope; - - public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) { - _eventBus = eventBus; - _sender = sender; - _scope = scope; - } - - public EventBusEndpoint setEventBus(EventBus eventBus) { - _eventBus = eventBus; - return this; - } - - public EventBusEndpoint setScope(PublishScope scope) { - _scope = scope; - return this; - } - - public PublishScope getScope() { - return _scope; - } - - public EventBusEndpoint setSender(String sender) { - _sender = sender; - return this; - } - - public String getSender() { - return _sender; - } - - public void Publish(String subject, Object args) { - assert(_eventBus != null); - _eventBus.publish(_sender, subject, _scope, args); - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java deleted file mode 100644 index 336a994..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.eventbus; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; - - -public class EventDispatcher implements Subscriber { - private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>(); - - private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>(); - private Object _targetObject; - - public EventDispatcher(Object targetObject) { - _targetObject = targetObject; - } - - @Override - public void onPublishEvent(String senderAddress, String subject, Object args) { - dispatch(_targetObject, subject, senderAddress, args); - } - - public static EventDispatcher getDispatcher(Object targetObject) { - EventDispatcher dispatcher; - synchronized(s_targetMap) { - dispatcher = s_targetMap.get(targetObject); - if(dispatcher == null) { - dispatcher = new EventDispatcher(targetObject); - s_targetMap.put(targetObject, dispatcher); - } - } - return dispatcher; - } - - public static void removeDispatcher(Object targetObject) { - synchronized(s_targetMap) { - s_targetMap.remove(targetObject); - } - } - - public static boolean dispatch(Object target, String subject, String senderAddress, Object args) { - assert(subject != null); - assert(target != null); - - Method handler = resolveHandler(target.getClass(), subject); - if(handler == null) - return false; - - try { - handler.invoke(target, subject, senderAddress, args); - } catch (IllegalArgumentException e) { - throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject); - } catch (IllegalAccessException e) { - throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject); - } catch (InvocationTargetException e) { - throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject); - } - - return true; - } - - public static Method resolveHandler(Class<?> handlerClz, String subject) { - synchronized(s_handlerCache) { - Method handler = s_handlerCache.get(handlerClz); - if(handler != null) - return handler; - - for(Method method : handlerClz.getMethods()) { - EventHandler annotation = method.getAnnotation(EventHandler.class); - if(annotation != null) { - if(match(annotation.topic(), subject)) { - s_handlerCache.put(handlerClz, method); - return method; - } - } - } - } - - return null; - } - - private static boolean match(String expression, String param) { - return param.matches(expression); - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java deleted file mode 100644 index 1ed3a00..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.eventbus; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface EventHandler { - public String topic(); -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java deleted file mode 100644 index 539a242..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cloudstack.framework.eventbus; - -public enum PublishScope { - LOCAL, GLOBAL -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java deleted file mode 100644 index 28b86de..0000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cloudstack.framework.eventbus; - -public interface Subscriber { - void onPublishEvent(String senderAddress, String subject, Object args); -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java new file mode 100644 index 0000000..4aa007d --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messagebus; + +import org.apache.cloudstack.framework.serializer.MessageSerializer; + +public interface MessageBus { + void setMessageSerializer(MessageSerializer messageSerializer); + MessageSerializer getMessageSerializer(); + + void subscribe(String subject, MessageSubscriber subscriber); + void unsubscribe(String subject, MessageSubscriber subscriber); + + void publish(String senderAddress, String subject, PublishScope scope, Object args); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java new file mode 100644 index 0000000..5b7af4d --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messagebus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.cloudstack.framework.serializer.MessageSerializer; + +public class MessageBusBase implements MessageBus { + + private Gate _gate; + private List<ActionRecord> _pendingActions; + + private SubscriptionNode _subscriberRoot; + private MessageSerializer _messageSerializer; + + public MessageBusBase() { + _gate = new Gate(); + _pendingActions = new ArrayList<ActionRecord>(); + + _subscriberRoot = new SubscriptionNode("/", null); + } + + @Override + public void setMessageSerializer(MessageSerializer messageSerializer) { + _messageSerializer = messageSerializer; + } + + @Override + public MessageSerializer getMessageSerializer() { + return _messageSerializer; + } + + @Override + public void subscribe(String subject, MessageSubscriber subscriber) { + assert(subject != null); + assert(subscriber != null); + if(_gate.enter()) { + SubscriptionNode current = locate(subject, null, true); + assert(current != null); + current.addSubscriber(subscriber); + _gate.leave(); + } else { + synchronized(_pendingActions) { + _pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber)); + } + } + } + + @Override + public void unsubscribe(String subject, MessageSubscriber subscriber) { + if(_gate.enter()) { + SubscriptionNode current = locate(subject, null, false); + if(current != null) + current.removeSubscriber(subscriber); + + _gate.leave(); + } else { + synchronized(_pendingActions) { + _pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber)); + } + } + } + + @Override + public void publish(String senderAddress, String subject, PublishScope scope, + Object args) { + + if(_gate.enter(true)) { + + List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>(); + SubscriptionNode current = locate(subject, chainFromTop, false); + + if(current != null) + current.notifySubscribers(senderAddress, subject, args); + + Collections.reverse(chainFromTop); + for(SubscriptionNode node : chainFromTop) + node.notifySubscribers(senderAddress, subject, args); + + _gate.leave(); + } + } + + private void onGateOpen() { + synchronized(_pendingActions) { + ActionRecord record = null; + if(_pendingActions.size() > 0) { + while((record = _pendingActions.remove(0)) != null) { + switch(record.getType()) { + case Subscribe : + { + SubscriptionNode current = locate(record.getSubject(), null, true); + assert(current != null); + current.addSubscriber(record.getSubscriber()); + } + break; + + case Unsubscribe : + { + SubscriptionNode current = locate(record.getSubject(), null, false); + if(current != null) + current.removeSubscriber(record.getSubscriber()); + } + break; + + default : + assert(false); + break; + + } + } + } + } + } + + + private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop, + boolean createPath) { + + assert(subject != null); + + String[] subjectPathTokens = subject.split("\\."); + return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath); + } + + private static SubscriptionNode locate(String[] subjectPathTokens, + SubscriptionNode current, List<SubscriptionNode> chainFromTop, boolean createPath) { + + assert(current != null); + assert(subjectPathTokens != null); + assert(subjectPathTokens.length > 0); + + if(chainFromTop != null) + chainFromTop.add(current); + + SubscriptionNode next = current.getChild(subjectPathTokens[0]); + if(next == null) { + if(createPath) { + next = new SubscriptionNode(subjectPathTokens[0], null); + current.addChild(subjectPathTokens[0], next); + } else { + return null; + } + } + + if(subjectPathTokens.length > 1) { + return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length), + next, chainFromTop, createPath); + } else { + return next; + } + } + + + // + // Support inner classes + // + private static enum ActionType { + Subscribe, + Unsubscribe + } + + private static class ActionRecord { + private ActionType _type; + private String _subject; + private MessageSubscriber _subscriber; + + public ActionRecord(ActionType type, String subject, MessageSubscriber subscriber) { + _type = type; + _subject = subject; + _subscriber = subscriber; + } + + public ActionType getType() { + return _type; + } + + public String getSubject() { + return _subject; + } + + public MessageSubscriber getSubscriber() { + return _subscriber; + } + } + + private class Gate { + private int _reentranceCount; + private Thread _gateOwner; + + public Gate() { + _reentranceCount = 0; + _gateOwner = null; + } + + public boolean enter() { + return enter(false); + } + + public boolean enter(boolean wait) { + while(true) { + synchronized(this) { + if(_reentranceCount == 0) { + assert(_gateOwner == null); + + _reentranceCount++; + _gateOwner = Thread.currentThread(); + return true; + } else { + if(wait) { + try { + wait(); + } catch (InterruptedException e) { + } + } else { + break; + } + } + } + } + + return false; + } + + public void leave() { + synchronized(this) { + if(_reentranceCount > 0) { + assert(_gateOwner == Thread.currentThread()); + + onGateOpen(); + _reentranceCount--; + assert(_reentranceCount == 0); + _gateOwner = null; + + notifyAll(); + } + } + } + } + + private static class SubscriptionNode { + @SuppressWarnings("unused") + private String _nodeKey; + private List<MessageSubscriber> _subscribers; + private Map<String, SubscriptionNode> _children; + + public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) { + assert(nodeKey != null); + _nodeKey = nodeKey; + _subscribers = new ArrayList<MessageSubscriber>(); + + if(subscriber != null) + _subscribers.add(subscriber); + + _children = new HashMap<String, SubscriptionNode>(); + } + + @SuppressWarnings("unused") + public List<MessageSubscriber> getSubscriber() { + return _subscribers; + } + + public void addSubscriber(MessageSubscriber subscriber) { + _subscribers.add(subscriber); + } + + public void removeSubscriber(MessageSubscriber subscriber) { + _subscribers.remove(subscriber); + } + + public SubscriptionNode getChild(String key) { + return _children.get(key); + } + + public void addChild(String key, SubscriptionNode childNode) { + _children.put(key, childNode); + } + + public void notifySubscribers(String senderAddress, String subject, Object args) { + for(MessageSubscriber subscriber : _subscribers) { + subscriber.onPublishMessage(senderAddress, subject, args); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java new file mode 100644 index 0000000..0824e13 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusEndpoint.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messagebus; + + +public class MessageBusEndpoint { + private MessageBus _eventBus; + private String _sender; + private PublishScope _scope; + + public MessageBusEndpoint(MessageBus eventBus, String sender, PublishScope scope) { + _eventBus = eventBus; + _sender = sender; + _scope = scope; + } + + public MessageBusEndpoint setEventBus(MessageBus eventBus) { + _eventBus = eventBus; + return this; + } + + public MessageBusEndpoint setScope(PublishScope scope) { + _scope = scope; + return this; + } + + public PublishScope getScope() { + return _scope; + } + + public MessageBusEndpoint setSender(String sender) { + _sender = sender; + return this; + } + + public String getSender() { + return _sender; + } + + public void Publish(String subject, Object args) { + assert(_eventBus != null); + _eventBus.publish(_sender, subject, _scope, args); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java new file mode 100644 index 0000000..ac75afb --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messagebus; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + + +public class MessageDispatcher implements MessageSubscriber { + private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>(); + + private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>(); + private Object _targetObject; + + public MessageDispatcher(Object targetObject) { + _targetObject = targetObject; + } + + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + dispatch(_targetObject, subject, senderAddress, args); + } + + public static MessageDispatcher getDispatcher(Object targetObject) { + MessageDispatcher dispatcher; + synchronized(s_targetMap) { + dispatcher = s_targetMap.get(targetObject); + if(dispatcher == null) { + dispatcher = new MessageDispatcher(targetObject); + s_targetMap.put(targetObject, dispatcher); + } + } + return dispatcher; + } + + public static void removeDispatcher(Object targetObject) { + synchronized(s_targetMap) { + s_targetMap.remove(targetObject); + } + } + + public static boolean dispatch(Object target, String subject, String senderAddress, Object args) { + assert(subject != null); + assert(target != null); + + Method handler = resolveHandler(target.getClass(), subject); + if(handler == null) + return false; + + try { + handler.invoke(target, subject, senderAddress, args); + } catch (IllegalArgumentException e) { + throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject); + } catch (IllegalAccessException e) { + throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject); + } catch (InvocationTargetException e) { + throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject); + } + + return true; + } + + public static Method resolveHandler(Class<?> handlerClz, String subject) { + synchronized(s_handlerCache) { + Method handler = s_handlerCache.get(handlerClz); + if(handler != null) + return handler; + + for(Method method : handlerClz.getMethods()) { + MessageHandler annotation = method.getAnnotation(MessageHandler.class); + if(annotation != null) { + if(match(annotation.topic(), subject)) { + s_handlerCache.put(handlerClz, method); + return method; + } + } + } + } + + return null; + } + + private static boolean match(String expression, String param) { + return param.matches(expression); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java new file mode 100644 index 0000000..d9f51df --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messagebus; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface MessageHandler { + public String topic(); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java new file mode 100644 index 0000000..072f98d --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageSubscriber.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messagebus; + +public interface MessageSubscriber { + void onPublishMessage(String senderAddress, String subject, Object args); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java new file mode 100644 index 0000000..2b3d8ac --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/PublishScope.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messagebus; + +public enum PublishScope { + LOCAL, GLOBAL +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java index 11bc428..f3b782d 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java @@ -18,10 +18,10 @@ */ package org.apache.cloudstack.framework.server; -import org.apache.cloudstack.framework.eventbus.EventBusBase; +import org.apache.cloudstack.framework.messagebus.MessageBusBase; import org.apache.cloudstack.framework.transport.TransportMultiplexier; -public class ServerEventBus extends EventBusBase implements TransportMultiplexier { +public class ServerEventBus extends MessageBusBase implements TransportMultiplexier { @Override public void onTransportMessage(String senderEndpointAddress, http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java ---------------------------------------------------------------------- diff --git a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java index 7b0a2ec..d59fe42 100644 --- a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java +++ b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent.java @@ -24,9 +24,9 @@ import java.util.TimerTask; import javax.annotation.PostConstruct; import javax.inject.Inject; -import org.apache.cloudstack.framework.eventbus.EventBus; -import org.apache.cloudstack.framework.eventbus.EventDispatcher; -import org.apache.cloudstack.framework.eventbus.EventHandler; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.framework.rpc.RpcCallbackListener; import org.apache.cloudstack.framework.rpc.RpcException; import org.apache.cloudstack.framework.rpc.RpcProvider; @@ -41,7 +41,7 @@ public class SampleManagerComponent { private static final Logger s_logger = Logger.getLogger(SampleManagerComponent.class); @Inject - private EventBus _eventBus; + private MessageBus _eventBus; @Inject private RpcProvider _rpcProvider; @@ -58,7 +58,7 @@ public class SampleManagerComponent { // subscribe to all network events (for example) _eventBus.subscribe("network", - EventDispatcher.getDispatcher(this)); + MessageDispatcher.getDispatcher(this)); _timer.schedule(new TimerTask() { public void run() { @@ -72,7 +72,7 @@ public class SampleManagerComponent { call.completeCall("NetworkPrepare completed"); } - @EventHandler(topic="network.prepare") + @MessageHandler(topic="network.prepare") void onPrepareNetwork(String sender, String topic, Object args) { } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java ---------------------------------------------------------------------- diff --git a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java index dc482c0..448c37b 100644 --- a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java +++ b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagerComponent2.java @@ -21,9 +21,9 @@ package org.apache.cloudstack.framework.sampleserver; import javax.annotation.PostConstruct; import javax.inject.Inject; -import org.apache.cloudstack.framework.eventbus.EventBus; -import org.apache.cloudstack.framework.eventbus.EventDispatcher; -import org.apache.cloudstack.framework.eventbus.EventHandler; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.framework.rpc.RpcProvider; import org.apache.cloudstack.framework.rpc.RpcServerCall; import org.apache.cloudstack.framework.rpc.RpcServiceDispatcher; @@ -36,7 +36,7 @@ public class SampleManagerComponent2 { private static final Logger s_logger = Logger.getLogger(SampleManagerComponent2.class); @Inject - private EventBus _eventBus; + private MessageBus _eventBus; @Inject private RpcProvider _rpcProvider; @@ -51,7 +51,7 @@ public class SampleManagerComponent2 { // subscribe to all network events (for example) _eventBus.subscribe("storage", - EventDispatcher.getDispatcher(this)); + MessageDispatcher.getDispatcher(this)); } @RpcServiceHandler(command="StoragePrepare") @@ -66,7 +66,7 @@ public class SampleManagerComponent2 { call.completeCall(answer); } - @EventHandler(topic="storage.prepare") + @MessageHandler(topic="storage.prepare") void onPrepareNetwork(String sender, String topic, Object args) { } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/85e73d18/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java ---------------------------------------------------------------------- diff --git a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java index eb09af0..9f260f1 100755 --- a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java +++ b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/manager/VmwareManagerImpl.java @@ -494,7 +494,8 @@ public class VmwareManagerImpl extends ManagerBase implements VmwareManager, Vmw s_logger.info("Inject SSH key pairs before copying systemvm.iso into secondary storage"); _configServer.updateKeyPairs(); - + s_logger.info("Copy System VM patch ISO file to secondary storage. source ISO: " + srcIso.getAbsolutePath() + + ", destination: " + destIso.getAbsolutePath()); try { FileUtil.copyfile(srcIso, destIso); } catch(IOException e) {
