Author: bdekruijff at gmail.com
Date: Fri Dec 17 17:14:55 2010
New Revision: 513
Log:
[sandbox] fabric code improvements / optmizations
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
Fri Dec 17 17:14:55 2010
@@ -27,7 +27,7 @@
import org.amdatu.core.fabric.cluster.ClusterMessageService;
import org.amdatu.core.fabric.cluster.ClusterTopicListener;
-public class ClusterMessageServiceImpl implements ClusterMessageService {
+public class ClusterMessageServiceImpl implements ClusterMessageService,
ClusterMessageListener {
private final Map<String, Set<ClusterTopicListener>>
m_clusterTopicListeners =
new HashMap<String, Set<ClusterTopicListener>>();
@@ -35,31 +35,21 @@
// injected
private volatile ClusterMemberService m_clusterMemberService;
- private ClusterMessageListener m_clusterMessageListener;
-
- /*
- * service lifecycle
- */
+ /********************************************************
+ * Service lifecycle
+ ********************************************************/
public synchronized void start() {
- m_clusterMessageListener = new ClusterMessageListener() {
- public void recieveMessage(Object message) {
- if (message instanceof TopicMessageWrapper) {
- dispatchMessages(((TopicMessageWrapper)
message).getTopic(),
- ((TopicMessageWrapper) message).getMessage());
- }
- }
- };
- m_clusterMemberService.subscribe(m_clusterMessageListener);
+ m_clusterMemberService.subscribe(this);
}
public synchronized void stop() {
- m_clusterMemberService.unsubscribe(m_clusterMessageListener);
+ m_clusterMemberService.unsubscribe(this);
}
- /*
- * API
- */
+ /********************************************************
+ * ClusterMessageService
+ ********************************************************/
public void publish(String topic, Object message) {
m_clusterMemberService.broadcast(new TopicMessageWrapper(topic,
message));
@@ -88,20 +78,21 @@
}
}
- /*
- * private
- */
-
- private void dispatchMessages(String topic, Object message) {
- if (topic == null || topic.equals("")) {
- return;
- }
- synchronized (m_clusterTopicListeners) {
- if (m_clusterTopicListeners.containsKey(topic)) {
- // TODO async
- for (ClusterTopicListener clusterTopicListener :
m_clusterTopicListeners.get(topic)) {
- // TODO clone?
- clusterTopicListener.recieveMessage(message);
+ /********************************************************
+ * ClusterMessageListener
+ ********************************************************/
+
+ public void recieveMessage(Object message) {
+ if (message instanceof TopicMessageWrapper) {
+ TopicMessageWrapper topicMessageWrapper = (TopicMessageWrapper)
message;
+ synchronized (m_clusterTopicListeners) {
+ if
(m_clusterTopicListeners.containsKey(topicMessageWrapper.getTopic())) {
+ // TODO async
+ for (ClusterTopicListener clusterTopicListener :
m_clusterTopicListeners.get(topicMessageWrapper
+ .getTopic())) {
+ // TODO clone?
+
clusterTopicListener.recieveMessage(topicMessageWrapper.getMessage());
+ }
}
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Fri Dec 17 17:14:55 2010
@@ -38,7 +38,7 @@
/**
* I keep track of local RemotableServiceEndpoint services and publish them in
the cluster
* I listen to the cluster and publish local RemoteServiceEndpoint services
for them
- * TODO support LOOKUP requests for more fine grained discovery
+ * TODO support LOOKUP requests for more fine grained discovery (use
ListenerHook)
*/
public class DiscoveryServiceImpl implements DiscoveryService,
ClusterTopicListener {
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Fri Dec 17 17:14:55 2010
@@ -157,7 +157,6 @@
public void recieveMessage(Object message) {
if (message instanceof EndpointInvokeMessage) {
-
EndpointInvokeMessage endpointInvokeMessage =
(EndpointInvokeMessage) message;
Map<String, Object> payload = endpointInvokeMessage.getPayload();
String invocationId = (String)
payload.get(MESSAGE_INVOCATION_ID_KEY);
@@ -166,18 +165,9 @@
String methodName = (String)
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
Object[] args = (Object[])
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
+ Class<?>[] types = getTypesFromArgs(args);
- Class<?>[] types;
- if (args == null) {
- types = new Class[0];
- }
- else {
- types = new Class[args.length];
- for (int i = 0; i < args.length; i++) {
- types[i] = args[i].getClass();
- }
- }
-
+ Object serviceResponse = null;
synchronized (m_serviceEndPointServiceReferenceComponents) {
ServiceReferenceComponentTuple serviceReferenceComponentTuple =
m_serviceEndPointServiceReferenceComponents.get(serviceEndpoint);
@@ -192,20 +182,10 @@
// TODO local service gone.. what to do? Send back
error to prevent client from waiting...
return;
}
- Method serviceMethod;
- serviceMethod =
serviceObject.getClass().getMethod(methodName, types);
- Object serviceResponse;
+ Method serviceMethod =
serviceObject.getClass().getMethod(methodName, types);
serviceResponse = serviceMethod.invoke(serviceObject,
args);
m_bundleContext.ungetService(serviceReference);
-
- Map<String, Object> responsePayload = new HashMap<String,
Object>();
- responsePayload.put(MESSAGE_INVOCATION_ID_KEY,
invocationId);
- responsePayload.put(MESSAGE_RESPONSE_MAP_KEY,
serviceResponse);
-
- m_clusterMessageService
- .publish(REMOTE_TOPIC, new
EndpointResponseMessage(serviceEndpoint, responsePayload));
-
}
catch (SecurityException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
@@ -228,6 +208,13 @@
e.printStackTrace();
}
}
+ if (serviceResponse != null) {
+ Map<String, Object> responsePayload = new HashMap<String,
Object>();
+ responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
+ responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, serviceResponse);
+ m_clusterMessageService
+ .publish(REMOTE_TOPIC, new
EndpointResponseMessage(serviceEndpoint, responsePayload));
+ }
}
}
@@ -282,22 +269,32 @@
serviceEndPoint.setProperties(properties);
return serviceEndPoint;
}
-}
-class ServiceReferenceComponentTuple {
- private final ServiceReference m_serviceReference;
- private final Component m_component;
-
- public ServiceReferenceComponentTuple(ServiceReference serviceReference,
Component component) {
- m_serviceReference = serviceReference;
- m_component = component;
+ private Class[] getTypesFromArgs(Object[] args) {
+ if (args == null)
+ return new Class<?>[0];
+ Class<?>[] types = new Class[args.length];
+ for (int i = 0; i < args.length; i++) {
+ types[i] = args[i].getClass();
+ }
+ return types;
}
- public ServiceReference getServiceReference() {
- return m_serviceReference;
- }
+ static class ServiceReferenceComponentTuple {
+ private final ServiceReference m_serviceReference;
+ private final Component m_component;
+
+ public ServiceReferenceComponentTuple(ServiceReference
serviceReference, Component component) {
+ m_serviceReference = serviceReference;
+ m_component = component;
+ }
+
+ public ServiceReference getServiceReference() {
+ return m_serviceReference;
+ }
- public Component getComponent() {
- return m_component;
+ public Component getComponent() {
+ return m_component;
+ }
}
-}
+}
\ No newline at end of file