Author: bdekruijff at gmail.com
Date: Mon Dec 20 16:42:38 2010
New Revision: 517
Log:
[sandbox] Made member messaging inherently asynchronous
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
Mon Dec 20 16:42:38 2010
@@ -23,6 +23,8 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
@@ -36,6 +38,8 @@
private final Set<ClusterMember> m_clusterMembers = new
HashSet<ClusterMember>();
private final Set<ClusterMessageListener> m_clusterMessageListeners = new
HashSet<ClusterMessageListener>();
+ private final ExecutorService m_executorService =
Executors.newFixedThreadPool(1);
+
private final String m_clusterId;
private final String m_memberId;
private final Map<String, Object> m_properties;
@@ -81,7 +85,7 @@
}
public final void broadcast(Object message) {
- doBroadcast(message);
+ m_executorService.submit(new BroadcastRunnable(message));
}
public final void subscribe(ClusterMessageListener clusterMessageListener)
{
@@ -117,7 +121,7 @@
protected final void dispatchMessage(Object message) {
synchronized (m_clusterMessageListeners) {
for (ClusterMessageListener clm : m_clusterMessageListeners) {
- clm.recieveMessage(message);
+ m_executorService.submit(new MessageDispatchRunnable(clm,
message));
}
}
}
@@ -129,4 +133,36 @@
}
public abstract void doBroadcast(Object message);
+
+ /********************************************************
+ * helper classes
+ ********************************************************/
+
+ class BroadcastRunnable implements Runnable {
+
+ private final Object m_message;
+
+ public BroadcastRunnable(final Object message) {
+ m_message = message;
+ }
+
+ public void run() {
+ doBroadcast(m_message);
+ }
+ }
+
+ static class MessageDispatchRunnable implements Runnable {
+
+ private final ClusterMessageListener m_clusterMessageListener;
+ private final Object m_message;
+
+ public MessageDispatchRunnable(final ClusterMessageListener
clusterMessageListener, final Object message) {
+ m_clusterMessageListener = clusterMessageListener;
+ m_message = message;
+ }
+
+ public void run() {
+ m_clusterMessageListener.recieveMessage(m_message);
+ }
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
Mon Dec 20 16:42:38 2010
@@ -54,7 +54,8 @@
Dictionary<String, Object> cm1props = new Hashtable<String, Object>();
cm1props.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, "CLUSTER1");
cm1props.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, memberid);
- cm1props.put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new
String[] { "-port", "8880" });
+ cm1props
+ .put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new
String[] { "-port", "8880", "-throughput" });
// ClusterMemberService