Author: bdekruijff at gmail.com
Date: Mon Dec 20 16:42:38 2010
New Revision: 517

Log:
[sandbox] Made member messaging inherently asynchronous

Modified:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
    Mon Dec 20 16:42:38 2010
@@ -23,6 +23,8 @@
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
@@ -36,6 +38,8 @@
     private final Set<ClusterMember> m_clusterMembers = new 
HashSet<ClusterMember>();
     private final Set<ClusterMessageListener> m_clusterMessageListeners = new 
HashSet<ClusterMessageListener>();
 
+    private final ExecutorService m_executorService = 
Executors.newFixedThreadPool(1);
+
     private final String m_clusterId;
     private final String m_memberId;
     private final Map<String, Object> m_properties;
@@ -81,7 +85,7 @@
     }
 
     public final void broadcast(Object message) {
-        doBroadcast(message);
+        m_executorService.submit(new BroadcastRunnable(message));
     }
 
     public final void subscribe(ClusterMessageListener clusterMessageListener) 
{
@@ -117,7 +121,7 @@
     protected final void dispatchMessage(Object message) {
         synchronized (m_clusterMessageListeners) {
             for (ClusterMessageListener clm : m_clusterMessageListeners) {
-                clm.recieveMessage(message);
+                m_executorService.submit(new MessageDispatchRunnable(clm, 
message));
             }
         }
     }
@@ -129,4 +133,36 @@
     }
 
     public abstract void doBroadcast(Object message);
+
+    /********************************************************
+     * helper classes
+     ********************************************************/
+
+    class BroadcastRunnable implements Runnable {
+
+        private final Object m_message;
+
+        public BroadcastRunnable(final Object message) {
+            m_message = message;
+        }
+
+        public void run() {
+            doBroadcast(m_message);
+        }
+    }
+
+    static class MessageDispatchRunnable implements Runnable {
+
+        private final ClusterMessageListener m_clusterMessageListener;
+        private final Object m_message;
+
+        public MessageDispatchRunnable(final ClusterMessageListener 
clusterMessageListener, final Object message) {
+            m_clusterMessageListener = clusterMessageListener;
+            m_message = message;
+        }
+
+        public void run() {
+            m_clusterMessageListener.recieveMessage(m_message);
+        }
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  Mon Dec 20 16:42:38 2010
@@ -54,7 +54,8 @@
         Dictionary<String, Object> cm1props = new Hashtable<String, Object>();
         cm1props.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, "CLUSTER1");
         cm1props.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, memberid);
-        cm1props.put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new 
String[] { "-port", "8880" });
+        cm1props
+            .put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new 
String[] { "-port", "8880", "-throughput" });
 
         // ClusterMemberService
 

Reply via email to