Author: bdekruijff at gmail.com
Date: Fri Dec 17 17:14:55 2010
New Revision: 513

Log:
[sandbox] fabric code improvements / optmizations

Modified:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
       (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java
       Fri Dec 17 17:14:55 2010
@@ -27,7 +27,7 @@
 import org.amdatu.core.fabric.cluster.ClusterMessageService;
 import org.amdatu.core.fabric.cluster.ClusterTopicListener;
 
-public class ClusterMessageServiceImpl implements ClusterMessageService {
+public class ClusterMessageServiceImpl implements ClusterMessageService, 
ClusterMessageListener {
 
     private final Map<String, Set<ClusterTopicListener>> 
m_clusterTopicListeners =
         new HashMap<String, Set<ClusterTopicListener>>();
@@ -35,31 +35,21 @@
     // injected
     private volatile ClusterMemberService m_clusterMemberService;
 
-    private ClusterMessageListener m_clusterMessageListener;
-
-    /*
-     * service lifecycle
-     */
+    /********************************************************
+     * Service lifecycle
+     ********************************************************/
 
     public synchronized void start() {
-        m_clusterMessageListener = new ClusterMessageListener() {
-            public void recieveMessage(Object message) {
-                if (message instanceof TopicMessageWrapper) {
-                    dispatchMessages(((TopicMessageWrapper) 
message).getTopic(),
-                        ((TopicMessageWrapper) message).getMessage());
-                }
-            }
-        };
-        m_clusterMemberService.subscribe(m_clusterMessageListener);
+        m_clusterMemberService.subscribe(this);
     }
 
     public synchronized void stop() {
-        m_clusterMemberService.unsubscribe(m_clusterMessageListener);
+        m_clusterMemberService.unsubscribe(this);
     }
 
-    /*
-     * API
-     */
+    /********************************************************
+     * ClusterMessageService
+     ********************************************************/
 
     public void publish(String topic, Object message) {
         m_clusterMemberService.broadcast(new TopicMessageWrapper(topic, 
message));
@@ -88,20 +78,21 @@
         }
     }
 
-    /*
-     * private
-     */
-
-    private void dispatchMessages(String topic, Object message) {
-        if (topic == null || topic.equals("")) {
-            return;
-        }
-        synchronized (m_clusterTopicListeners) {
-            if (m_clusterTopicListeners.containsKey(topic)) {
-                // TODO async
-                for (ClusterTopicListener clusterTopicListener : 
m_clusterTopicListeners.get(topic)) {
-                    // TODO clone?
-                    clusterTopicListener.recieveMessage(message);
+    /********************************************************
+     * ClusterMessageListener
+     ********************************************************/
+
+    public void recieveMessage(Object message) {
+        if (message instanceof TopicMessageWrapper) {
+            TopicMessageWrapper topicMessageWrapper = (TopicMessageWrapper) 
message;
+            synchronized (m_clusterTopicListeners) {
+                if 
(m_clusterTopicListeners.containsKey(topicMessageWrapper.getTopic())) {
+                    // TODO async
+                    for (ClusterTopicListener clusterTopicListener : 
m_clusterTopicListeners.get(topicMessageWrapper
+                        .getTopic())) {
+                        // TODO clone?
+                        
clusterTopicListener.recieveMessage(topicMessageWrapper.getMessage());
+                    }
                 }
             }
         }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     Fri Dec 17 17:14:55 2010
@@ -38,7 +38,7 @@
 /**
  * I keep track of local RemotableServiceEndpoint services and publish them in 
the cluster
  * I listen to the cluster and publish local RemoteServiceEndpoint services 
for them
- * TODO support LOOKUP requests for more fine grained discovery
+ * TODO support LOOKUP requests for more fine grained discovery (use 
ListenerHook)
  */
 public class DiscoveryServiceImpl implements DiscoveryService, 
ClusterTopicListener {
 

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  Fri Dec 17 17:14:55 2010
@@ -157,7 +157,6 @@
 
     public void recieveMessage(Object message) {
         if (message instanceof EndpointInvokeMessage) {
-
             EndpointInvokeMessage endpointInvokeMessage = 
(EndpointInvokeMessage) message;
             Map<String, Object> payload = endpointInvokeMessage.getPayload();
             String invocationId = (String) 
payload.get(MESSAGE_INVOCATION_ID_KEY);
@@ -166,18 +165,9 @@
 
             String methodName = (String) 
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
             Object[] args = (Object[]) 
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
+            Class<?>[] types = getTypesFromArgs(args);
 
-            Class<?>[] types;
-            if (args == null) {
-                types = new Class[0];
-            }
-            else {
-                types = new Class[args.length];
-                for (int i = 0; i < args.length; i++) {
-                    types[i] = args[i].getClass();
-                }
-            }
-
+            Object serviceResponse = null;
             synchronized (m_serviceEndPointServiceReferenceComponents) {
                 ServiceReferenceComponentTuple serviceReferenceComponentTuple =
                     
m_serviceEndPointServiceReferenceComponents.get(serviceEndpoint);
@@ -192,20 +182,10 @@
                         // TODO local service gone.. what to do? Send back 
error to prevent client from waiting...
                         return;
                     }
-                    Method serviceMethod;
-                    serviceMethod = 
serviceObject.getClass().getMethod(methodName, types);
-                    Object serviceResponse;
+                    Method serviceMethod = 
serviceObject.getClass().getMethod(methodName, types);
                     serviceResponse = serviceMethod.invoke(serviceObject, 
args);
 
                     m_bundleContext.ungetService(serviceReference);
-
-                    Map<String, Object> responsePayload = new HashMap<String, 
Object>();
-                    responsePayload.put(MESSAGE_INVOCATION_ID_KEY, 
invocationId);
-                    responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, 
serviceResponse);
-
-                    m_clusterMessageService
-                        .publish(REMOTE_TOPIC, new 
EndpointResponseMessage(serviceEndpoint, responsePayload));
-
                 }
                 catch (SecurityException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
@@ -228,6 +208,13 @@
                     e.printStackTrace();
                 }
             }
+            if (serviceResponse != null) {
+                Map<String, Object> responsePayload = new HashMap<String, 
Object>();
+                responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
+                responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, serviceResponse);
+                m_clusterMessageService
+                    .publish(REMOTE_TOPIC, new 
EndpointResponseMessage(serviceEndpoint, responsePayload));
+            }
         }
     }
 
@@ -282,22 +269,32 @@
         serviceEndPoint.setProperties(properties);
         return serviceEndPoint;
     }
-}
 
-class ServiceReferenceComponentTuple {
-    private final ServiceReference m_serviceReference;
-    private final Component m_component;
-
-    public ServiceReferenceComponentTuple(ServiceReference serviceReference, 
Component component) {
-        m_serviceReference = serviceReference;
-        m_component = component;
+    private Class[] getTypesFromArgs(Object[] args) {
+        if (args == null)
+            return new Class<?>[0];
+        Class<?>[] types = new Class[args.length];
+        for (int i = 0; i < args.length; i++) {
+            types[i] = args[i].getClass();
+        }
+        return types;
     }
 
-    public ServiceReference getServiceReference() {
-        return m_serviceReference;
-    }
+    static class ServiceReferenceComponentTuple {
+        private final ServiceReference m_serviceReference;
+        private final Component m_component;
+
+        public ServiceReferenceComponentTuple(ServiceReference 
serviceReference, Component component) {
+            m_serviceReference = serviceReference;
+            m_component = component;
+        }
+
+        public ServiceReference getServiceReference() {
+            return m_serviceReference;
+        }
 
-    public Component getComponent() {
-        return m_component;
+        public Component getComponent() {
+            return m_component;
+        }
     }
-}
+}
\ No newline at end of file

Reply via email to