Updated Branches:
  refs/heads/javelin 0a7d03c90 -> 24530a274

Architecture refactoring - Stateless management server - EventBus


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/24530a27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/24530a27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/24530a27

Branch: refs/heads/javelin
Commit: 24530a274e44279c337b145e0432f64b67545bfa
Parents: 0a7d03c
Author: Kelven Yang <[email protected]>
Authored: Mon Oct 15 18:03:09 2012 -0700
Committer: Kelven Yang <[email protected]>
Committed: Mon Oct 15 18:10:38 2012 -0700

----------------------------------------------------------------------
 utils/src/com/cloud/utils/events/EventBus.java     |   25 ++
 utils/src/com/cloud/utils/events/EventBusBase.java |  292 +++++++++++++++
 utils/src/com/cloud/utils/events/EventsTest.java   |   66 ----
 utils/src/com/cloud/utils/events/PublishScope.java |   23 ++
 utils/src/com/cloud/utils/events/Subscriber.java   |   22 ++
 5 files changed, 362 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventBus.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventBus.java 
b/utils/src/com/cloud/utils/events/EventBus.java
new file mode 100644
index 0000000..4195acd
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/EventBus.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public interface EventBus {
+       void subscribe(String subject, Subscriber subscriber);
+       void unsubscribe(String subject, Subscriber subscriber);
+       
+       void publish(String subject, PublishScope scope, Object sender, String 
args);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventBusBase.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventBusBase.java 
b/utils/src/com/cloud/utils/events/EventBusBase.java
new file mode 100644
index 0000000..0c135db
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/EventBusBase.java
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+import edu.emory.mathcs.backport.java.util.Collections;
+
+public class EventBusBase implements EventBus {
+
+       private Gate _gate;
+       private List<ActionRecord> _pendingActions;
+       
+       private SubscriptionNode _subscriberRoot;
+       
+       public EventBusBase() {
+               _gate = new Gate();
+               _pendingActions = new ArrayList<ActionRecord>();
+               
+               _subscriberRoot = new SubscriptionNode("/", null);
+       }
+       
+       @Override
+       public void subscribe(String subject, Subscriber subscriber) {
+               assert(subject != null);
+               assert(subscriber != null);
+               if(_gate.enter()) {
+                       SubscriptionNode current = locate(subject, null, true);
+                       assert(current != null);
+                       current.addSubscriber(subscriber);
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.Subscribe, subject, subscriber));
+                       }
+               }
+       }
+
+       @Override
+       public void unsubscribe(String subject, Subscriber subscriber) {
+               if(_gate.enter()) {
+                       SubscriptionNode current = locate(subject, null, false);
+                       if(current != null)
+                               current.removeSubscriber(subscriber);
+                       
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.Unsubscribe, subject, subscriber));
+                       }
+               }
+       }
+
+       @Override
+       public void publish(String subject, PublishScope scope, Object sender,
+               String args) {
+               
+               if(_gate.enter(true)) {
+
+                       List<SubscriptionNode> chainFromTop = new 
ArrayList<SubscriptionNode>();
+                       SubscriptionNode current = locate(subject, 
chainFromTop, false);
+                       
+                       if(current != null)
+                               current.notifySubscribers(subject, sender, 
args);
+                       
+                       Collections.reverse(chainFromTop);
+                       for(SubscriptionNode node : chainFromTop)
+                               node.notifySubscribers(subject, sender, args);
+                       
+                       _gate.leave();
+               }
+       }
+       
+       private void onGateOpen() {
+               synchronized(_pendingActions) {
+                       ActionRecord record = null;
+                       if(_pendingActions.size() > 0) {
+                               while((record = _pendingActions.remove(0)) != 
null) {
+                                       switch(record.getType()) {
+                                       case Subscribe :
+                                               {
+                                                       SubscriptionNode 
current = locate(record.getSubject(), null, true);
+                                                       assert(current != null);
+                                                       
current.addSubscriber(record.getSubscriber());
+                                               }
+                                               break;
+                                               
+                                       case Unsubscribe :
+                                               {
+                                                       SubscriptionNode 
current = locate(record.getSubject(), null, false);
+                                                       if(current != null)
+                                                               
current.removeSubscriber(record.getSubscriber());
+                                               }
+                                               break;
+                                               
+                                       default :
+                                               assert(false);
+                                               break;
+                                       
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       
+       private SubscriptionNode locate(String subject, List<SubscriptionNode> 
chainFromTop,
+               boolean createPath) {
+               
+               assert(subject != null);
+               
+               String[] subjectPathTokens = subject.split("\\.");
+               return locate(subjectPathTokens, _subscriberRoot, chainFromTop, 
createPath);
+       }
+       
+       private static SubscriptionNode locate(String[] subjectPathTokens, 
+               SubscriptionNode current, List<SubscriptionNode> chainFromTop, 
boolean createPath) {
+               
+               assert(current != null);
+               assert(subjectPathTokens != null);
+               assert(subjectPathTokens.length > 0);
+
+               if(chainFromTop != null)
+                       chainFromTop.add(current);
+               
+               SubscriptionNode next = current.getChild(subjectPathTokens[0]);
+               if(next == null) {
+                       if(createPath) {
+                               next = new 
SubscriptionNode(subjectPathTokens[0], null);
+                               current.addChild(subjectPathTokens[0], next);
+                       } else {
+                               return null;
+                       }
+               }
+               
+               if(subjectPathTokens.length > 1) {
+                       return 
locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, 
subjectPathTokens.length),
+                               next, chainFromTop, createPath);
+               } else {
+                       return next;
+               }
+       }
+       
+       
+       //
+       // Support inner classes
+       //
+       private static enum ActionType {
+               Subscribe,
+               Unsubscribe
+       }
+       
+       private static class ActionRecord {
+               private ActionType _type;
+               private String _subject;
+               private Subscriber _subscriber;
+               
+               public ActionRecord(ActionType type, String subject, Subscriber 
subscriber) {
+                       _type = type;
+                       _subject = subject;
+                       _subscriber = subscriber;
+               }
+               
+               public ActionType getType() { 
+                       return _type; 
+               }
+               
+               public String getSubject() {
+                       return _subject;
+               }
+               
+               public Subscriber getSubscriber() {
+                       return _subscriber;
+               }
+       }
+       
+       private class Gate {
+               private int _reentranceCount;
+               private Thread _gateOwner;
+               
+               public Gate() {
+                       _reentranceCount = 0;
+                       _gateOwner = null;
+               }
+               
+               public boolean enter() {
+                       return enter(false);
+               }
+               
+               public boolean enter(boolean wait) {
+                       while(true) {
+                               synchronized(this) {
+                                       if(_reentranceCount == 0) {
+                                               assert(_gateOwner == null);
+                                               
+                                               _reentranceCount++;
+                                               _gateOwner = 
Thread.currentThread();
+                                               return true;
+                                       } else {
+                                               if(wait) {
+                                                       try {
+                                                               wait();
+                                                       } catch 
(InterruptedException e) {
+                                                       }
+                                               } else {
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+                       
+                       return false;
+               }
+               
+               public void leave() {
+                       synchronized(this) {
+                               if(_reentranceCount > 0) {
+                                       assert(_gateOwner == 
Thread.currentThread());
+                                       
+                                       onGateOpen();
+                                       _reentranceCount--;
+                                       assert(_reentranceCount == 0);
+                                       _gateOwner = null;
+                                       
+                                       notifyAll();
+                               }
+                       }
+               }
+       }
+       
+       private static class SubscriptionNode {
+               private String _nodeKey;
+               private List<Subscriber> _subscribers;
+               private Map<String, SubscriptionNode> _children;
+               
+               public SubscriptionNode(String nodeKey, Subscriber subscriber) {
+                       assert(nodeKey != null);
+                       _nodeKey = nodeKey;
+                       _subscribers = new ArrayList<Subscriber>();
+                       
+                       if(subscriber != null)
+                               _subscribers.add(subscriber);
+                       
+                       _children = new HashMap<String, SubscriptionNode>();
+               }
+               
+               public List<Subscriber> getSubscriber() {
+                       return _subscribers;
+               }
+               
+               public void addSubscriber(Subscriber subscriber) {
+                       _subscribers.add(subscriber);
+               }
+               
+               public void removeSubscriber(Subscriber subscriber) {
+                       _subscribers.remove(subscriber);
+               }
+               
+               public SubscriptionNode getChild(String key) {
+                       return _children.get(key);
+               }
+               
+               public void addChild(String key, SubscriptionNode childNode) {
+                       _children.put(key, childNode);
+               }
+               
+               public void notifySubscribers(String subject, Object sender, 
String args) {
+                       for(Subscriber subscriber : _subscribers) {
+                               subscriber.onPublishEvent(subject, sender, 
args);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventsTest.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventsTest.java 
b/utils/src/com/cloud/utils/events/EventsTest.java
deleted file mode 100644
index abef23e..0000000
--- a/utils/src/com/cloud/utils/events/EventsTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.utils.events;
-
-public class EventsTest {
-       public void onWeatherChange(Object sender, EventArgs args) {
-               System.out.println("onWeatherChange, weather: " + 
((WeatherChangeEventArgs)args).getWeather());
-       }
-       
-       public void onTrafficChange(Object sender, EventArgs args) {
-               System.out.println("onTrafficChange");
-       }
-       
-       public void run() {
-               SubscriptionMgr mgr = SubscriptionMgr.getInstance();
-               try {
-                       mgr.subscribe("weather", this, "onWeatherChange");
-                       mgr.subscribe("traffic", this, "onTrafficChange");
-               } catch (SecurityException e) {
-                       e.printStackTrace();
-               } catch (NoSuchMethodException e) {
-                       e.printStackTrace();
-               }
-               
-               mgr.notifySubscribers("weather", null, new 
WeatherChangeEventArgs("weather", "Sunny"));
-               mgr.notifySubscribers("traffic", null, EventArgs.Empty);
-       }
-
-       public static void main(String[] args) {
-               EventsTest test = new EventsTest();
-               test.run();
-       }
-}
-
-class WeatherChangeEventArgs extends EventArgs {
-       private static final long serialVersionUID = -952166331523609047L;
-       
-       private String weather;
-       
-       public WeatherChangeEventArgs() {
-       }
-       
-       public WeatherChangeEventArgs(String subject, String weather) {
-               super(subject);
-               this.weather = weather;
-       }
-       
-       public String getWeather() { return weather; }
-       public void setWeather(String weather) {
-               this.weather = weather;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/PublishScope.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/PublishScope.java 
b/utils/src/com/cloud/utils/events/PublishScope.java
new file mode 100644
index 0000000..bab852a
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/PublishScope.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public enum PublishScope {
+       LOCAL,
+       GLOBAL
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/Subscriber.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/Subscriber.java 
b/utils/src/com/cloud/utils/events/Subscriber.java
new file mode 100644
index 0000000..7af283b
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/Subscriber.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public interface Subscriber {
+       void onPublishEvent(String subject, Object sender, String args);
+}

Reply via email to