Author: bdekruijff at gmail.com
Date: Tue Dec 21 18:33:02 2010
New Revision: 524

Log:
[sandbox] improved concurrency/performance/logging, testcode moved out

Added:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
Removed:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/FabricTestService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/NonRemotableService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/RemotableService.java
   sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/
   sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/
   sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/
Modified:
   sandbox/bdekruijff/fabric/pom.xml
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
   
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java

Modified: sandbox/bdekruijff/fabric/pom.xml
==============================================================================
--- sandbox/bdekruijff/fabric/pom.xml   (original)
+++ sandbox/bdekruijff/fabric/pom.xml   Tue Dec 21 18:33:02 2010
@@ -42,6 +42,7 @@
           <instructions>
             
<Bundle-Activator>org.amdatu.core.fabric.osgi.Activator</Bundle-Activator>
             <Bundle-SymbolicName>org.amdatu.core.fabric</Bundle-SymbolicName>
+            <DynamicImport-Package>*</DynamicImport-Package>
             <Export-Package>org.amdatu.core.fabric.cluster, 
org.amdatu.core.fabric.remote</Export-Package>
             <Embed-Dependency>*;scope=compile</Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    Tue Dec 21 18:33:02 2010
@@ -31,8 +31,12 @@
 
     ClusterMember[] getClusterMembers();
 
+    ClusterMember getClusterMember(String memberId);
+
     void broadcast(Object message);
 
+    void send(ClusterMember[] clusterMembers, Object message);
+
     void subscribe(ClusterMessageListener clusterMessageListener);
 
     void unsubscribe(ClusterMessageListener clusterMessageListener);

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
  Tue Dec 21 18:33:02 2010
@@ -0,0 +1,27 @@
+package org.amdatu.core.fabric.cluster.internal;
+
+import java.io.Serializable;
+
+public abstract class RoutedMessage implements Serializable {
+
+    private final String m_targetClusterId;
+    private final String[] m_targetMemberIds;
+
+    public RoutedMessage(final String clusterId, final String memberId) {
+        m_targetClusterId = clusterId;
+        m_targetMemberIds = new String[] { memberId };
+    }
+
+    public RoutedMessage(final String clusterId, String[] memberIds) {
+        m_targetClusterId = clusterId;
+        m_targetMemberIds = memberIds;
+    }
+
+    public final String getClusterId() {
+        return m_targetClusterId;
+    }
+
+    public final String[] getMemberIds() {
+        return m_targetMemberIds;
+    }
+}
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
    Tue Dec 21 18:33:02 2010
@@ -25,6 +25,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
@@ -35,9 +36,12 @@
  */
 public abstract class AbstractClusterMemberService implements 
ClusterMemberService {
 
-    private final Set<ClusterMember> m_clusterMembers = new 
HashSet<ClusterMember>();
+    private final Map<String, ClusterMember> m_clusterMembers = new 
HashMap<String, ClusterMember>();
     private final Set<ClusterMessageListener> m_clusterMessageListeners = new 
HashSet<ClusterMessageListener>();
 
+    private final ReentrantReadWriteLock m_clusterMembersLock = new 
ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock m_clusterMessageListenersLock = new 
ReentrantReadWriteLock();
+
     private final ExecutorService m_executorService = 
Executors.newFixedThreadPool(1);
 
     private final String m_clusterId;
@@ -79,8 +83,22 @@
     }
 
     public final ClusterMember[] getClusterMembers() {
-        synchronized (m_clusterMembers) {
-            return m_clusterMembers.toArray(new 
ClusterMember[m_clusterMembers.size()]);
+        m_clusterMembersLock.readLock().lock();
+        try {
+            return m_clusterMembers.values().toArray(new 
ClusterMember[m_clusterMembers.size()]);
+        }
+        finally {
+            m_clusterMembersLock.readLock().unlock();
+        }
+    }
+
+    public final ClusterMember getClusterMember(String memberId) {
+        m_clusterMembersLock.readLock().lock();
+        try {
+            return m_clusterMembers.get(memberId);
+        }
+        finally {
+            m_clusterMembersLock.readLock().unlock();
         }
     }
 
@@ -88,17 +106,29 @@
         m_executorService.submit(new BroadcastRunnable(message));
     }
 
+    public final void send(ClusterMember[] clusterMembers, Object message) {
+        m_executorService.submit(new SendRunnable(clusterMembers, message));
+    }
+
     public final void subscribe(ClusterMessageListener clusterMessageListener) 
{
-        synchronized (m_clusterMessageListeners) {
+        m_clusterMessageListenersLock.writeLock().lock();
+        try {
             m_clusterMessageListeners.add(clusterMessageListener);
         }
+        finally {
+            m_clusterMessageListenersLock.writeLock().unlock();
+        }
         onSubscribe(clusterMessageListener);
     }
 
     public final void unsubscribe(ClusterMessageListener 
clusterMessageListener) {
-        synchronized (m_clusterMessageListeners) {
+        m_clusterMessageListenersLock.writeLock().lock();
+        try {
             m_clusterMessageListeners.remove(clusterMessageListener);
         }
+        finally {
+            m_clusterMessageListenersLock.writeLock().unlock();
+        }
         onUnsubscribe(clusterMessageListener);
     }
 
@@ -107,23 +137,35 @@
      ********************************************************/
 
     protected final void addClusterMember(ClusterMember clusterMember) {
-        synchronized (m_clusterMembers) {
-            m_clusterMembers.add(clusterMember);
+        m_clusterMembersLock.writeLock().lock();
+        try {
+            m_clusterMembers.put(clusterMember.getId(), clusterMember);
+        }
+        finally {
+            m_clusterMembersLock.writeLock().unlock();
         }
     }
 
     protected final void removeClusterMember(ClusterMember clusterMember) {
-        synchronized (m_clusterMembers) {
-            m_clusterMembers.remove(clusterMember);
+        m_clusterMembersLock.writeLock().lock();
+        try {
+            m_clusterMembers.remove(clusterMember.getId());
+        }
+        finally {
+            m_clusterMembersLock.writeLock().unlock();
         }
     }
 
     protected final void dispatchMessage(Object message) {
-        synchronized (m_clusterMessageListeners) {
+        m_clusterMessageListenersLock.readLock().lock();
+        try {
             for (ClusterMessageListener clm : m_clusterMessageListeners) {
                 m_executorService.submit(new MessageDispatchRunnable(clm, 
message));
             }
         }
+        finally {
+            m_clusterMessageListenersLock.readLock().unlock();
+        }
     }
 
     protected void onSubscribe(ClusterMessageListener clusterMessageListener) {
@@ -134,6 +176,8 @@
 
     public abstract void doBroadcast(Object message);
 
+    public abstract void doSend(ClusterMember[] clusterMember, Object message);
+
     /********************************************************
      * helper classes
      ********************************************************/
@@ -151,6 +195,21 @@
         }
     }
 
+    class SendRunnable implements Runnable {
+
+        private final ClusterMember[] m_clusterMember;
+        private final Object m_message;
+
+        public SendRunnable(final ClusterMember[] clusterMembers, final Object 
message) {
+            m_clusterMember = clusterMembers;
+            m_message = message;
+        }
+
+        public void run() {
+            doSend(m_clusterMember, m_message);
+        }
+    }
+
     static class MessageDispatchRunnable implements Runnable {
 
         private final ClusterMessageListener m_clusterMessageListener;

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
       (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
       Tue Dec 21 18:33:02 2010
@@ -18,22 +18,35 @@
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.ClusterMessageListener;
 import org.amdatu.core.fabric.cluster.ClusterMessageService;
 import org.amdatu.core.fabric.cluster.ClusterTopicListener;
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
 import org.amdatu.core.fabric.cluster.internal.TopicMessageWrapper;
+import org.osgi.service.log.LogService;
 
 public class ClusterMessageServiceImpl implements ClusterMessageService, 
ClusterMessageListener {
 
     private final Map<String, Set<ClusterTopicListener>> 
m_clusterTopicListeners =
         new HashMap<String, Set<ClusterTopicListener>>();
 
+    private final ReentrantReadWriteLock m_clusterTopicListenersLock = new 
ReentrantReadWriteLock();
+
+    private final ExecutorService m_executorService = 
Executors.newFixedThreadPool(1);
+
     // injected
     private volatile ClusterMemberService m_clusterMemberService;
+    private volatile LogService m_logService;
 
     /********************************************************
      * Constructors
@@ -59,31 +72,75 @@
      * ClusterMessageService
      ********************************************************/
 
-    public void publish(String topic, Object message) {
-        m_clusterMemberService.broadcast(new TopicMessageWrapper(topic, 
message));
+    public void publish(final String topic, final Object message) {
+        if (message instanceof RoutedMessage) {
+            RoutedMessage routedMessage = (RoutedMessage) message;
+            if (routedMessage.getClusterId() == null
+                || 
!routedMessage.getClusterId().equals(m_clusterMemberService.getClusterId())) {
+                m_logService.log(LogService.LOG_ERROR,
+                    "RoutedMessage is not for this cluster: " + 
routedMessage.getClusterId());
+                return;
+            }
+            if (routedMessage.getMemberIds() == null || 
routedMessage.getMemberIds().length == 0) {
+                m_logService.log(LogService.LOG_ERROR, "RoutedMessage does not 
specify any target members");
+                return;
+            }
+            List<ClusterMember> clusterMembers = new 
LinkedList<ClusterMember>();
+            for (String memberId : routedMessage.getMemberIds()) {
+                ClusterMember clusterMember = 
m_clusterMemberService.getClusterMember(memberId);
+                if (clusterMember != null) {
+                    clusterMembers.add(clusterMember);
+                }
+                else {
+                    m_logService.log(LogService.LOG_WARNING, "RoutedMessage 
specifies unknown target member: "
+                        + memberId);
+                }
+            }
+            if (clusterMembers.size() > 0) {
+                m_clusterMemberService.send(clusterMembers.toArray(new 
ClusterMember[clusterMembers.size()]),
+                    new TopicMessageWrapper(topic, message));
+                return;
+            }
+            m_logService.log(LogService.LOG_WARNING, "RoutedMessage has no 
available target members left");
+            // send dropped message to avoid blocking
+        }
+        else {
+            m_clusterMemberService.broadcast(new TopicMessageWrapper(topic, 
message));
+        }
     }
 
-    public void subscribe(ClusterTopicListener clusterTopicListener) {
+    public void subscribe(final ClusterTopicListener clusterTopicListener) {
         if (clusterTopicListener.getTopic() == null || 
clusterTopicListener.getTopic().equals("")) {
             return;
         }
-        synchronized (m_clusterTopicListeners) {
+        m_clusterTopicListenersLock.writeLock().lock();
+        try {
             if 
(!m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) {
                 m_clusterTopicListeners.put(clusterTopicListener.getTopic(), 
new HashSet<ClusterTopicListener>());
             }
             
m_clusterTopicListeners.get(clusterTopicListener.getTopic()).add(clusterTopicListener);
         }
+        finally {
+            m_clusterTopicListenersLock.writeLock().unlock();
+        }
     }
 
     public void unsubscribe(ClusterTopicListener clusterTopicListener) {
         if (clusterTopicListener.getTopic() == null || 
clusterTopicListener.getTopic().equals("")) {
             return;
         }
-        synchronized (m_clusterTopicListeners) {
+        m_clusterTopicListenersLock.writeLock().lock();
+        try {
             if 
(m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) {
                 
m_clusterTopicListeners.get(clusterTopicListener.getTopic()).remove(clusterTopicListener);
+                if 
(m_clusterTopicListeners.get(clusterTopicListener.getTopic()).size() == 0) {
+                    
m_clusterTopicListeners.remove(clusterTopicListener.getTopic());
+                }
             }
         }
+        finally {
+            m_clusterTopicListenersLock.writeLock().unlock();
+        }
     }
 
     /********************************************************
@@ -93,16 +150,38 @@
     public void recieveMessage(Object message) {
         if (message instanceof TopicMessageWrapper) {
             TopicMessageWrapper topicMessageWrapper = (TopicMessageWrapper) 
message;
-            synchronized (m_clusterTopicListeners) {
+            m_clusterTopicListenersLock.readLock().lock();
+            try {
                 if 
(m_clusterTopicListeners.containsKey(topicMessageWrapper.getTopic())) {
-                    // TODO async
                     for (ClusterTopicListener clusterTopicListener : 
m_clusterTopicListeners.get(topicMessageWrapper
                         .getTopic())) {
-                        // TODO clone?
-                        
clusterTopicListener.recieveMessage(topicMessageWrapper.getMessage());
+                        m_executorService.submit(new 
MessageDispatchRunnable(clusterTopicListener, topicMessageWrapper
+                            .getMessage()));
                     }
                 }
             }
+            finally {
+                m_clusterTopicListenersLock.readLock().unlock();
+            }
+        }
+    }
+
+    /********************************************************
+     * helper classes
+     ********************************************************/
+
+    static class MessageDispatchRunnable implements Runnable {
+
+        private final ClusterTopicListener m_clusterTopicListener;
+        private final Object m_message;
+
+        public MessageDispatchRunnable(final ClusterTopicListener 
clusterTopicListener, final Object message) {
+            m_clusterTopicListener = clusterTopicListener;
+            m_message = message;
+        }
+
+        public void run() {
+            m_clusterTopicListener.recieveMessage(m_message);
         }
     }
 }
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
 Tue Dec 21 18:33:02 2010
@@ -21,8 +21,14 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
 import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
 import org.amdatu.core.fabric.cluster.service.AbstractClusterMemberService;
@@ -32,14 +38,16 @@
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.util.UUIDGenerator;
 
 public final class ClusterMemberServiceImpl extends 
AbstractClusterMemberService implements MembershipListener,
     ChannelListener {
 
     public static final String CLUSTER_TRIBES_ARGS_PROP = 
"org.amdatu.fabric.cluster.tribes.args";
 
+    private final Map<ClusterMember, Member> m_clusterMemberMembers = new 
HashMap<ClusterMember, Member>();
+
+    private final ReentrantReadWriteLock m_clusterMemberMembersLock = new 
ReentrantReadWriteLock();
+
     private ManagedChannel m_managedChannel;
 
     /********************************************************
@@ -61,14 +69,12 @@
                     CLUSTER_TRIBES_ARGS_PROP));
 
             Properties props = new Properties();
-            props.setProperty(CLUSTER_MEMBERID_PROP, 
Arrays.toString(UUIDGenerator.randomUUID(true)));
+            props.setProperty(CLUSTER_MEMBERID_PROP, getMemberId());
 
             m_managedChannel.addMembershipListener(this);
             m_managedChannel.addChannelListener(this);
             
m_managedChannel.getMembershipService().setPayload(getPayload(props));
 
-// 
m_managedChannel.getMembershipService().setDomain("Amdatu".getBytes(Charset.forName("ISO-8859-1")));
-
             m_managedChannel.start(Channel.DEFAULT);
         }
         catch (Exception e) {
@@ -105,15 +111,46 @@
         }
     }
 
+    @Override
+    public void doSend(ClusterMember[] clusterMembers, Object message) {
+        // TODO check and wrap message. Look into send options
+        if (message instanceof Serializable) {
+            try {
+                List<Member> members = new LinkedList<Member>();
+                synchronized (m_clusterMemberMembers) {
+                    for (ClusterMember clusterMember : clusterMembers) {
+                        Member member = 
m_clusterMemberMembers.get(clusterMember);
+                        if (member != null) {
+                            members.add(member);
+                        }
+                    }
+                }
+                if (members.size() > 0)
+                    m_managedChannel.send(members.toArray(new 
Member[members.size()]), (Serializable) message,
+                        Channel.SEND_OPTIONS_ASYNCHRONOUS);
+            }
+            catch (ChannelException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
     /********************************************************
      * MembershipListener
      ********************************************************/
 
     public void memberAdded(Member member) {
         try {
-            System.out.println("Received member added:" + member);
-            addClusterMember(new 
ClusterMemberImpl(getProperties(member.getPayload())
-                .getProperty(CLUSTER_MEMBERID_PROP)));
+            ClusterMember clusterMember = new 
ClusterMemberImpl(getProperties(member.getPayload())
+                .getProperty(CLUSTER_MEMBERID_PROP));
+            m_clusterMemberMembersLock.writeLock().lock();
+            try {
+                m_clusterMemberMembers.put(clusterMember, member);
+            }
+            finally {
+                m_clusterMemberMembersLock.writeLock().unlock();
+            }
+            addClusterMember(clusterMember);
         }
         catch (Exception x) {
             x.printStackTrace();
@@ -122,9 +159,26 @@
 
     public void memberDisappeared(Member member) {
         try {
-            System.out.println("Received member disappeared:" + member);
-            removeClusterMember(new 
ClusterMemberImpl(getProperties(member.getPayload())
-                .getProperty(CLUSTER_MEMBERID_PROP)));
+            // FIXME use memberid to tuple map to reduce object creation
+            String memberId = getProperties(member.getPayload())
+                .getProperty(CLUSTER_MEMBERID_PROP);
+            ClusterMember toBeRemoved = null;
+            m_clusterMemberMembersLock.writeLock().lock();
+            try {
+                for (ClusterMember clusterMember : 
m_clusterMemberMembers.keySet()) {
+                    if (clusterMember.getId().equals(memberId)) {
+                        toBeRemoved = clusterMember;
+                        break;
+                    }
+                }
+                if (toBeRemoved != null) {
+                    m_clusterMemberMembers.remove(toBeRemoved);
+                }
+            }
+            finally {
+                m_clusterMemberMembersLock.writeLock().unlock();
+            }
+            removeClusterMember(toBeRemoved);
         }
         catch (Exception x) {
             x.printStackTrace();
@@ -159,4 +213,23 @@
         props.load(bin);
         return props;
     }
+
+    static class ClusterMemberMemberTuple {
+
+        private final ClusterMember m_clusterMember;
+        private final Member m_member;
+
+        public ClusterMemberMemberTuple(final ClusterMember clusterMember, 
final Member member) {
+            m_clusterMember = clusterMember;
+            m_member = member;
+        }
+
+        public ClusterMember getClusterMember() {
+            return m_clusterMember;
+        }
+
+        public Member getMember() {
+            return m_member;
+        }
+    }
 }
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  Tue Dec 21 18:33:02 2010
@@ -33,10 +33,6 @@
 import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
 import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
 import org.amdatu.core.fabric.remote.shell.DiscoveryListCommand;
-import org.amdatu.core.fabric.test.FabricTestService;
-import org.amdatu.core.fabric.test.RemotableService;
-import org.amdatu.core.fabric.test.service.FabricTestServiceImpl;
-import org.amdatu.core.fabric.test.shell.FabricTestCommand;
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
 import org.apache.felix.shell.Command;
@@ -115,26 +111,7 @@
                         .setRequired(false))
                 
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
 
-        // Test & Shell
-
-        manager.add(
-            createComponent()
-                .setImplementation(new FabricTestServiceImpl())
-                .setInterface(FabricTestService.class.getName(), null)
-                
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
-        manager.add(
-            createComponent()
-                .setImplementation(new FabricTestCommand())
-                .setInterface(Command.class.getName(), null)
-                .add(
-                    
createServiceDependency().setService(FabricTestService.class).setRequired(true))
-                .add(
-                    
createServiceDependency().setService(RemotableService.class).setRequired(false)
-                        .setCallbacks("remotableServiceAdded", 
"remotableServiceRemoved"))
-                .add(
-                    
createServiceDependency().setService(LogService.class).setRequired(false)
-                        ));
+        // Shell
 
         manager.add(
             createComponent()

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
      (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
      Tue Dec 21 18:33:02 2010
@@ -38,7 +38,7 @@
     final static String SERVICE_CONFIGURATION_TYPE = "org.amdatu.core.fabric";
 
     final static String SERVICE_EXPORTED_CONFIGS_PROP = 
"service.exported.configs";
-    final static String SERVICE_EXPORTED_INTENTS_PROP = 
"service.exported.intent";
+    final static String SERVICE_EXPORTED_INTENTS_PROP = 
"service.exported.intents";
     final static String SERVICE_EXPORTED_INTENTS_EXTRA_PROP = 
"service.exported.intents.extra";
     final static String SERVICE_EXPORTED_INTERFACES_PROP = 
"service.exported.iterfaces";
 

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  Tue Dec 21 18:33:02 2010
@@ -26,7 +26,6 @@
     private String m_memberId;
     private String[] m_objectClass;
     private long m_originalServiceId;
-    private long m_localServiceId;
     private Hashtable<String, Object> m_properties;
 
     public ServiceEndPoint() {
@@ -64,14 +63,6 @@
         m_originalServiceId = originalServiceId;
     }
 
-    public long getLocalServiceId() {
-        return m_localServiceId;
-    }
-
-    public void setLocalServiceId(long localServiceId) {
-        m_localServiceId = localServiceId;
-    }
-
     public Hashtable<String, Object> getProperties() {
         return m_properties;
     }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   Tue Dec 21 18:33:02 2010
@@ -19,14 +19,16 @@
 import java.io.Serializable;
 import java.util.Map;
 
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointInvokeMessage implements Serializable {
+public class EndpointInvokeMessage extends RoutedMessage implements 
Serializable {
 
     private ServiceEndPoint m_serviceEndPoint;
     private Map<String, Object> m_payload;
 
     public EndpointInvokeMessage(ServiceEndPoint serviceEndPoint, Map<String, 
Object> payload) {
+        super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId());
         m_serviceEndPoint = serviceEndPoint;
         m_payload = payload;
     }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 Tue Dec 21 18:33:02 2010
@@ -19,22 +19,19 @@
 import java.io.Serializable;
 import java.util.Map;
 
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointResponseMessage implements Serializable {
+public class EndpointResponseMessage extends RoutedMessage implements 
Serializable {
 
-    private ServiceEndPoint m_serviceEndPoint;
     private Map<String, Object> m_payload;
 
-    public EndpointResponseMessage(ServiceEndPoint serviceEndPoint, 
Map<String, Object> payload) {
-        m_serviceEndPoint = serviceEndPoint;
+    public EndpointResponseMessage(final String originClusterId, final String 
originMemberId,
+        final Map<String, Object> payload) {
+        super(originClusterId, originMemberId);
         m_payload = payload;
     }
 
-    public ServiceEndPoint getServiceEndPoint() {
-        return m_serviceEndPoint;
-    }
-
     public Map<String, Object> getPayload() {
         return m_payload;
     }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   Tue Dec 21 18:33:02 2010
@@ -18,68 +18,89 @@
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.ClusterMessageService;
 import org.amdatu.core.fabric.cluster.ClusterTopicListener;
 import org.amdatu.core.fabric.remote.DistributionService;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
 
 /**
  * I am a delegate to the DistributionService
  * I proxy local service invocations and put them over the cluster
  */
-public class LocalServiceInvocationHandler implements InvocationHandler, 
ClusterTopicListener {
+public final class LocalServiceInvocationHandler implements InvocationHandler, 
ClusterTopicListener {
 
-    private final static String MESSAGE_INVOCATION_ID_KEY = "MII";
-    private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY = 
"MIEI";
-    private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK";
-    private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK";
-    private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM";
-    private final static long INVOCATION_TIMEOUT = 10000;
+    private final static long INVOCATION_TIMEOUT = 100;
 
     private final Set<String> m_invocationIdentifiers = new HashSet<String>();
-    private final Map<String, Object> myInvocationResponses = new 
HashMap<String, Object>();
+    private final Map<String, Object> m_invocationResponses = new 
HashMap<String, Object>();
 
+    private final ReentrantReadWriteLock m_invocationIdentifiersLock = new 
ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock m_invocationResponsesLock = new 
ReentrantReadWriteLock();
+
+    private volatile ClusterMemberService m_clusterMemberService;
     private volatile ClusterMessageService m_clusterMessageService;
 
     private final ServiceEndPoint m_serviceEndpoint;
     private final Class<?>[] m_interfaceClasses;
+    private final Set<Method> m_interfaceMethods = new HashSet<Method>();
 
     private String m_serviceEndpointTopic;
 
+    /********************************************************
+     * Constructors
+     ********************************************************/
+
     public LocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint, 
Class<?>[] interfaceClasses) {
         m_serviceEndpoint = serviceEndpoint;
         m_interfaceClasses = interfaceClasses;
         m_serviceEndpointTopic = DistributionService.REMOTE_TOPIC;
+
+        for (Class<?> interfaceClass : interfaceClasses) {
+            for (Method serviceMethod : interfaceClass.getMethods()) {
+                m_interfaceMethods.add(serviceMethod);
+            }
+        }
+    }
+
+    /********************************************************
+     * Access
+     ********************************************************/
+
+    public ServiceEndPoint getServiceEndPoint() {
+        return m_serviceEndpoint;
     }
 
-    /*
-     * lifecycle
-     */
+    /********************************************************
+     * Service lifecycle
+     ********************************************************/
 
-    public void start() {
+    public synchronized void start() {
         m_clusterMessageService.subscribe(this);
     }
 
-    public void stop() {
+    public synchronized void stop() {
         m_clusterMessageService.unsubscribe(this);
     }
 
-    public ServiceEndPoint getServiceEndPoint() {
-        return m_serviceEndpoint;
-    }
 
-    /*
-     * InvocationHandler interface
-     */
-    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    /********************************************************
+     * InvocationHandler
+     ********************************************************/
 
-        if (isServiceIntefaceInvocation(method)) {
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        if (isServiceInterfaceInvocation(method)) {
             String invocationIdentifier = createNewInvocationIdentifier();
             Map<String, Object> payload = 
getInvocationPayload(invocationIdentifier, method, args);
             m_clusterMessageService.publish(m_serviceEndpointTopic, new 
EndpointInvokeMessage(m_serviceEndpoint,
@@ -90,9 +111,10 @@
         return method.invoke(this, args);
     }
 
-    /*
-     * ClusterTopicListener interface
-     */
+    /********************************************************
+     * ClusterTopicListner
+     ********************************************************/
+
     public String getTopic() {
         return m_serviceEndpointTopic;
     }
@@ -101,19 +123,44 @@
         if (message instanceof EndpointResponseMessage) {
             EndpointResponseMessage endpointResponseMessage = 
(EndpointResponseMessage) message;
             Map<String, Object> payload = endpointResponseMessage.getPayload();
-            String invocationId = (String) 
payload.get(MESSAGE_INVOCATION_ID_KEY);
+            String invocationId = (String) 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
             if (ownsInvocationIndentifier(invocationId)) {
-                Object response = payload.get(MESSAGE_RESPONSE_MAP_KEY);
+                Object response = 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
                 storeResponseObject(invocationId, response);
                 removeInvocationIdentifier(invocationId);
-                System.err.println("Stored response: " + response);
             }
         }
     }
 
-    /*
+    /********************************************************
+     * Object
+     ********************************************************/
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Proxy) {
+            InvocationHandler invocationHandler = 
Proxy.getInvocationHandler(obj);
+            if (invocationHandler instanceof LocalServiceInvocationHandler) {
+                return getServiceEndPoint().equals(
+                    ((LocalServiceInvocationHandler) 
invocationHandler).getServiceEndPoint());
+            }
+            return false;
+        }
+        if (obj instanceof LocalServiceInvocationHandler) {
+            return getServiceEndPoint().equals(
+                ((LocalServiceInvocationHandler) obj).getServiceEndPoint());
+        }
+        return super.equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+        return getServiceEndPoint().hashCode();
+    }
+
+    /********************************************************
      * private
-     */
+     ********************************************************/
 
     private String createNewInvocationIdentifier() {
         String invocationId = UUID.randomUUID().toString();
@@ -125,10 +172,13 @@
 
     private Map<String, Object> getInvocationPayload(String invocationId, 
Method method, Object[] args) {
         Map<String, Object> payload = new HashMap<String, Object>();
-        payload.put(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY, 
m_serviceEndpoint);
-        payload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
-        payload.put(MESSAGE_INVOCATION_METHODNAME_KEY, method.getName());
-        payload.put(MESSAGE_INVOCATION_ARGUMENTS_KEY, args);
+        payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY, 
invocationId);
+        payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_METHODNAME_KEY, 
method.getName());
+        payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ARGUMENTS_KEY, 
args);
+        
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY,
+            m_clusterMemberService.getClusterId());
+        
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY,
+            m_clusterMemberService.getMemberId());
         return payload;
     }
 
@@ -141,16 +191,14 @@
             if (invocationResponseRecieved(invocationId)) {
                 response = retrieveInvocationResponseObject(invocationId);
                 isResponseRecieved = true;
-                System.err.println("Recieved repsonse for invocation response: 
" + invocationId);
             }
             else {
                 isResponseTimedOut = isTimedOut(invocationStart);
             }
         }
         if (isResponseTimedOut) {
-            System.err.println("Timed out waiting for invocation response: " + 
invocationId);
+            // FIXME now what?
         }
-        System.err.println("Invocation took " + (System.currentTimeMillis() - 
invocationStart) + " ms");
         return response;
     }
 
@@ -158,44 +206,58 @@
         return (System.currentTimeMillis() - invocationStart) > 
INVOCATION_TIMEOUT;
     }
 
-    private boolean invocationResponseRecieved(String invocationId) {
-        synchronized (myInvocationResponses) {
-            return myInvocationResponses.containsKey(invocationId);
+    private boolean invocationResponseRecieved(final String invocationId) {
+        m_invocationResponsesLock.readLock().lock();
+        try {
+            return m_invocationResponses.containsKey(invocationId);
+        }
+        finally {
+            m_invocationResponsesLock.readLock().unlock();
         }
     }
 
     private Object retrieveInvocationResponseObject(final String invocationId) 
{
-        synchronized (myInvocationResponses) {
-            return myInvocationResponses.remove(invocationId);
+        m_invocationResponsesLock.writeLock().lock();
+        try {
+            return m_invocationResponses.remove(invocationId);
+        }
+        finally {
+            m_invocationResponsesLock.writeLock().unlock();
         }
     }
 
     private boolean ownsInvocationIndentifier(final String invocationId) {
-        synchronized (m_invocationIdentifiers) {
+        m_invocationIdentifiersLock.readLock().lock();
+        try {
             return m_invocationIdentifiers.contains(invocationId);
         }
+        finally {
+            m_invocationIdentifiersLock.readLock().unlock();
+        }
     }
 
     private void storeResponseObject(final String invocationId, final Object 
payload) {
-        synchronized (myInvocationResponses) {
-            myInvocationResponses.put(invocationId, payload);
+        m_invocationResponsesLock.writeLock().lock();
+        try {
+            m_invocationResponses.put(invocationId, payload);
+        }
+        finally {
+            m_invocationResponsesLock.writeLock().unlock();
         }
     }
 
     private void removeInvocationIdentifier(final String invocationId) {
-        synchronized (m_invocationIdentifiers) {
+        m_invocationIdentifiersLock.writeLock().lock();
+        try {
             m_invocationIdentifiers.remove(invocationId);
         }
+        finally {
+            m_invocationIdentifiersLock.writeLock().unlock();
+        }
     }
 
-    private boolean isServiceIntefaceInvocation(Method method) {
-        for (Class<?> serviceInterfaceClass : m_interfaceClasses) {
-            for (Method serviceMethod : serviceInterfaceClass.getMethods()) {
-                if (method.equals(serviceMethod)) {
-                    return true;
-                }
-            }
-        }
-        return false;
+    private boolean isServiceInterfaceInvocation(final Method method) {
+        // no lock needed
+        return m_interfaceMethods.contains(method);
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     Tue Dec 21 18:33:02 2010
@@ -20,6 +20,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMessageService;
 import org.amdatu.core.fabric.cluster.ClusterTopicListener;
@@ -43,8 +44,11 @@
 public class DiscoveryServiceImpl implements DiscoveryService, 
ClusterTopicListener {
 
     private final Set<ServiceEndPoint> m_remotableEndPoints = new 
HashSet<ServiceEndPoint>();
+    private final ReentrantReadWriteLock m_remotableEndPointsLock = new 
ReentrantReadWriteLock();
+
     private final Map<ServiceEndPoint, Component> m_remoteEndPointComponents =
         new HashMap<ServiceEndPoint, Component>();
+    private final ReentrantReadWriteLock m_remoteEndPointComponentsLock = new 
ReentrantReadWriteLock();
 
     private volatile ClusterMessageService m_clusterMessageService;
     private volatile DependencyManager m_dependencyManager;
@@ -80,7 +84,8 @@
         ServiceReference serviceReference, Object 
remotableServiceEndpointObject) {
         RemotableServiceEndpoint remotableServiceEndpoint = 
(RemotableServiceEndpoint) remotableServiceEndpointObject;
         ServiceEndPoint serviceEndPoint = 
remotableServiceEndpoint.getServiceEndPoint();
-        synchronized (m_remotableEndPoints) {
+        m_remotableEndPointsLock.writeLock().lock();
+        try {
             if (!m_remotableEndPoints.contains(serviceEndPoint)) {
                 m_remotableEndPoints.add(serviceEndPoint);
                 m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointPublishMessage(serviceEndPoint));
@@ -88,7 +93,9 @@
             else {
                 throw new IllegalStateException("Unexpected state... needs 
analysis");
             }
-
+        }
+        finally {
+            m_remotableEndPointsLock.writeLock().unlock();
         }
     }
 
@@ -96,7 +103,9 @@
         ServiceReference serviceReference, Object 
remotableServiceEndpointObject) {
         RemotableServiceEndpoint remotableServiceEndpoint = 
(RemotableServiceEndpoint) remotableServiceEndpointObject;
         ServiceEndPoint serviceEndPoint = 
remotableServiceEndpoint.getServiceEndPoint();
-        synchronized (m_remotableEndPoints) {
+
+        m_remotableEndPointsLock.writeLock().lock();
+        try {
             if (m_remotableEndPoints.contains(serviceEndPoint)) {
                 m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointDepublishMessage(serviceEndPoint));
                 m_remotableEndPoints.remove(serviceEndPoint);
@@ -105,6 +114,9 @@
                 throw new IllegalStateException("Unexpected state... needs 
analysis");
             }
         }
+        finally {
+            m_remotableEndPointsLock.writeLock().unlock();
+        }
     }
 
     /********************************************************
@@ -112,60 +124,80 @@
      ********************************************************/
 
     public ServiceEndPoint[] getLocalServiceEndPoints() {
-        synchronized (m_remotableEndPoints) {
+        m_remotableEndPointsLock.readLock().lock();
+        try {
             return m_remotableEndPoints.toArray(new 
ServiceEndPoint[m_remotableEndPoints.size()]);
         }
+        finally {
+            m_remotableEndPointsLock.readLock().unlock();
+        }
     }
 
     public ServiceEndPoint[] getRemoteServiceEndPoints() {
-        synchronized (m_remoteEndPointComponents) {
+        m_remoteEndPointComponentsLock.readLock().lock();
+        try {
             return m_remoteEndPointComponents.keySet().toArray(
                 new 
ServiceEndPoint[m_remoteEndPointComponents.keySet().size()]);
         }
+        finally {
+            m_remoteEndPointComponentsLock.readLock().unlock();
+        }
     }
 
     /********************************************************
      * ClusterTopicListner
      ********************************************************/
 
-    // TODO handle LOOKUP
     public void recieveMessage(Object message) {
         if (message instanceof EndpointPublishMessage) {
             EndpointPublishMessage endpointPublishMessage = 
(EndpointPublishMessage) message;
             ServiceEndPoint serviceEndPoint = 
endpointPublishMessage.getServiceEndPoint();
-            synchronized (m_remoteEndPointComponents) {
+            m_remoteEndPointComponentsLock.writeLock().lock();
+            try {
                 if (m_remoteEndPointComponents.containsKey(serviceEndPoint))
                     return;
                 Component serviceComponent =
                     
m_dependencyManager.createComponent().setInterface(RemoteServiceEndPoint.class.getName(),
 null)
                         .setImplementation(new 
RemoteServiceEndPointImpl(serviceEndPoint));
-                // FIXME validate that this depdendency works
                 
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
                     DiscoveryService.class, 
m_component.getServiceRegistration().getReference()));
                 m_dependencyManager.add(serviceComponent);
                 m_remoteEndPointComponents.put(serviceEndPoint, 
serviceComponent);
             }
+            finally {
+                m_remoteEndPointComponentsLock.writeLock().unlock();
+            }
             return;
         }
         if (message instanceof EndpointDepublishMessage) {
             EndpointDepublishMessage endpointDepublishMessage = 
(EndpointDepublishMessage) message;
             ServiceEndPoint serviceEndPoint = 
endpointDepublishMessage.getServiceEndPoint();
-            synchronized (m_remoteEndPointComponents) {
+            m_remoteEndPointComponentsLock.writeLock().lock();
+            try {
                 if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
                     return;
                 Component serviceComponent = 
m_remoteEndPointComponents.remove(serviceEndPoint);
                 m_dependencyManager.remove(serviceComponent);
             }
+            finally {
+                m_remoteEndPointComponentsLock.writeLock().unlock();
+            }
             return;
         }
         if (message instanceof EndpointDiscoveryMessage) {
-            synchronized (m_remotableEndPoints) {
+            m_remotableEndPointsLock.writeLock().lock();
+            try {
                 for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
                     m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointPublishMessage(serviceEndPoint));
                 }
             }
+            finally {
+                m_remotableEndPointsLock.writeLock().unlock();
+            }
             return;
         }
+        throw new IllegalStateException("Unknown message type " + 
message.getClass().getName() + "on channel "
+            + DISCOVERY_TOPIC);
     }
 
     public String getTopic() {

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  Tue Dec 21 18:33:02 2010
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.ClusterMessageService;
@@ -44,17 +45,20 @@
 
 public class DistributionServiceImpl implements DistributionService, 
ClusterTopicListener {
 
-    private final static String MESSAGE_INVOCATION_ID_KEY = "MII";
-    private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY = 
"MIEI";
-    private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK";
-    private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK";
-    private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM";
+    public final static String MESSAGE_INVOCATION_ID_KEY = "A";
+    public final static String MESSAGE_INVOCATION_METHODNAME_KEY = "B";
+    public final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "C";
+    public final static String MESSAGE_INVOCATION_RESPONSE_MAP_KEY = "D";
+    public final static String MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY = "E";
+    public final static String MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY = "F";
 
     private final Map<ServiceEndPoint, ServiceReferenceComponentTuple> 
m_serviceEndPointServiceReferenceComponents =
         new HashMap<ServiceEndPoint, ServiceReferenceComponentTuple>();
+    private final ReentrantReadWriteLock 
m_serviceEndPointServiceReferenceComponentsLock = new ReentrantReadWriteLock();
 
     private final Map<ServiceEndPoint, Component> m_serviceEndPointComponents =
         new HashMap<ServiceEndPoint, Component>();
+    private final ReentrantReadWriteLock m_serviceEndPointComponentsLock = new 
ReentrantReadWriteLock();
 
     private volatile ClusterMemberService m_clusterMemberService;
     private volatile ClusterMessageService m_clusterMessageService;
@@ -75,6 +79,7 @@
      ********************************************************/
 
     public synchronized void start() {
+        m_logService.log(LogService.LOG_WARNING, "Starting 
DistributionService");
         Dictionary<String, Object> distributionProps = 
m_component.getServiceProperties();
         distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, 
m_clusterMemberService.getClusterId());
         distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, 
m_clusterMemberService.getMemberId());
@@ -87,6 +92,7 @@
     }
 
     public synchronized void stop() {
+        m_logService.log(LogService.LOG_WARNING, "Stopping 
DistributionService");
         m_clusterMessageService.unsubscribe(this);
     }
 
@@ -98,7 +104,7 @@
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
         if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
             m_logService
-                .log(LogService.LOG_DEBUG, "Unsupported endpoint configuration 
" + serviceEndPoint.toString());
+                .log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint 
configuration " + serviceEndPoint.toString());
             return;
         }
         Dictionary<String, Object> distributionProperties = new 
Hashtable<String, Object>();
@@ -108,28 +114,41 @@
                     .setImplementation(new 
RemotableServiceEndPointImpl(serviceEndPoint));
         
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
                 DistributionService.class, 
m_component.getServiceRegistration().getReference()));
-        synchronized (m_serviceEndPointServiceReferenceComponents) {
+        m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+        try {
             if 
(!m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
                 
m_serviceEndPointServiceReferenceComponents.put(serviceEndPoint,
                         new ServiceReferenceComponentTuple(
                             serviceReference, serviceComponent));
                 m_dependencyManager.add(serviceComponent);
+                m_logService.log(LogService.LOG_WARNING, "Added local 
ServiceEndPoint: " + serviceEndPoint.toString());
             }
         }
+        finally {
+            
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+        }
     }
 
     public void localRemotableServiceRemoved(ServiceReference serviceReference 
/* , Object Service */) {
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
-        synchronized (m_serviceEndPointServiceReferenceComponents) {
+        m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+        try {
             if 
(m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
                 ServiceReferenceComponentTuple serviceReferenceComponentTuple =
                     
m_serviceEndPointServiceReferenceComponents.remove(serviceEndPoint);
                 
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
+                m_logService
+                    .log(LogService.LOG_WARNING, "Removed local 
ServiceEndPoint: " + serviceEndPoint.toString());
             }
             else {
+                m_logService.log(LogService.LOG_WARNING, "Not removed unknown 
local ServiceEndPoint: "
+                    + serviceEndPoint.toString());
                 throw new IllegalStateException("Unexpected state... this 
needs analysis");
             }
         }
+        finally {
+            
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+        }
     }
 
     public void remoteServiceEndPointAdded(/* ServiceReference 
serviceReference, */Object remoteServiceEndPointObject) {
@@ -138,30 +157,46 @@
         Object localServiceProxy = 
createLocalServiceInvocationHandler(serviceEndPoint);
         if (localServiceProxy != null) {
             Component localServiceComponent = 
createLocalServiceComponent(serviceEndPoint, localServiceProxy);
-            synchronized (m_serviceEndPointComponents) {
+            m_serviceEndPointComponentsLock.writeLock().lock();
+            try {
                 if (!m_serviceEndPointComponents.containsKey(serviceEndPoint)) 
{
                     m_serviceEndPointComponents.put(serviceEndPoint, 
localServiceComponent);
                     m_dependencyManager.add(localServiceComponent);
+                    m_logService.log(LogService.LOG_WARNING,
+                        "Added remote ServiceEndPoint: " + 
serviceEndPoint.toString());
                 }
                 else {
+                    m_logService.log(LogService.LOG_WARNING,
+                        "Not added duplicate remote ServiceEndPoint: " + 
serviceEndPoint.toString());
                     throw new IllegalStateException("Unexpected state... this 
needs analysis");
                 }
             }
+            finally {
+                m_serviceEndPointComponentsLock.writeLock().unlock();
+            }
         }
     }
 
     public void remoteServiceEndPointRemoved(/* ServiceReference 
serviceReference, */Object remoteServiceEndPointObject) {
         RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint) 
remoteServiceEndPointObject;
         ServiceEndPoint serviceEndPoint = 
remoteServiceEndPoint.getServiceEndPoint();
-        synchronized (m_serviceEndPointComponents) {
+        m_serviceEndPointComponentsLock.writeLock().lock();
+        try {
             if (m_serviceEndPointComponents.containsKey(serviceEndPoint)) {
                 Component localServiceComponent = 
m_serviceEndPointComponents.get(serviceEndPoint);
                 m_dependencyManager.remove(localServiceComponent);
+                m_logService.log(LogService.LOG_WARNING,
+                    "Removed remote ServiceEndPoint: " + 
serviceEndPoint.toString());
             }
             else {
+                m_logService.log(LogService.LOG_WARNING,
+                    "Not removed unknown remote ServiceEndPoint: " + 
serviceEndPoint.toString());
                 throw new IllegalStateException("Unexpected state... this 
needs analysis");
             }
         }
+        finally {
+            m_serviceEndPointComponentsLock.writeLock().unlock();
+        }
     }
 
     /********************************************************
@@ -175,21 +210,33 @@
     public void recieveMessage(Object message) {
         if (message instanceof EndpointInvokeMessage) {
             EndpointInvokeMessage endpointInvokeMessage = 
(EndpointInvokeMessage) message;
+            ServiceEndPoint serviceEndPoint = 
endpointInvokeMessage.getServiceEndPoint();
             Map<String, Object> payload = endpointInvokeMessage.getPayload();
             String invocationId = (String) 
payload.get(MESSAGE_INVOCATION_ID_KEY);
-            ServiceEndPoint serviceEndpoint = (ServiceEndPoint) payload
-                        .get(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY);
-
+            String originClusterId = (String) 
payload.get(MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY);
+            String originMemberId = (String) 
payload.get(MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY);
             String methodName = (String) 
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
             Object[] args = (Object[]) 
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
             Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
 
             Object serviceResponse = null;
-            synchronized (m_serviceEndPointServiceReferenceComponents) {
+            m_serviceEndPointServiceReferenceComponentsLock.readLock().lock();
+            try {
                 ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-                    
m_serviceEndPointServiceReferenceComponents.get(serviceEndpoint);
+                    
m_serviceEndPointServiceReferenceComponents.get(serviceEndPoint);
                 if (serviceReferenceComponentTuple == null) {
-                    // TODO local service gone.. what to do? Send back error 
to prevent client from waiting...
+                    if 
(serviceEndPoint.getMemberId().equals(m_clusterMemberService.getMemberId())) {
+                        // TODO local service gone.. what to do? Send back 
error to prevent client from waiting...
+                        m_logService
+                            .log(
+                                LogService.LOG_WARNING,
+                                "Dropping EndpointInvokeMessage for unknown 
ServiceEndPoint: "
+                                    + serviceEndPoint.toString());
+                    }
+                    else {
+                        m_logService.log(LogService.LOG_WARNING,
+                            "Dropping EndpointInvokeMessage for other 
clustermember: " + serviceEndPoint.toString());
+                    }
                     return;
                 }
                 ServiceReference serviceReference = 
serviceReferenceComponentTuple.getServiceReference();
@@ -197,6 +244,11 @@
                     Object serviceObject = 
m_bundleContext.getService(serviceReference);
                     if (serviceObject == null) {
                         // TODO local service gone.. what to do? Send back 
error to prevent client from waiting...
+                        m_logService
+                            .log(
+                                LogService.LOG_WARNING,
+                                "Dropping EndpointInvokeMessage for 
unavailable service: "
+                                    + serviceEndPoint.toString());
                         return;
                     }
                     Method serviceMethod = 
serviceObject.getClass().getMethod(methodName, types);
@@ -206,38 +258,47 @@
                 }
                 catch (SecurityException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
+                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
                     e.printStackTrace();
                 }
                 catch (NoSuchMethodException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
+                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
                     e.printStackTrace();
                 }
                 catch (IllegalArgumentException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
+                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
                     e.printStackTrace();
                 }
                 catch (IllegalAccessException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
+                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
                     e.printStackTrace();
                 }
                 catch (InvocationTargetException e) {
                     // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
+                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
                     e.printStackTrace();
                 }
             }
+            finally {
+                
m_serviceEndPointServiceReferenceComponentsLock.readLock().unlock();
+            }
             if (serviceResponse != null) {
                 Map<String, Object> responsePayload = new HashMap<String, 
Object>();
                 responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
-                responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, serviceResponse);
+                responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY, 
serviceResponse);
                 m_clusterMessageService
-                    .publish(REMOTE_TOPIC, new 
EndpointResponseMessage(serviceEndpoint, responsePayload));
+                    .publish(REMOTE_TOPIC,
+                        new EndpointResponseMessage(originClusterId, 
originMemberId, responsePayload));
             }
         }
     }
 
-    /*
+    /********************************************************
      * private
-     */
+     ********************************************************/
 
     private Object createLocalServiceInvocationHandler(ServiceEndPoint 
serviceEndpoint) {
         Class<?>[] interfaceClasses = new 
Class<?>[serviceEndpoint.getObjectClass().length];
@@ -278,7 +339,8 @@
         Component component = m_dependencyManager.createComponent()
             .setInterface(serviceEndPoint.getObjectClass(), 
registrationProperties)
             .setImplementation(serviceObject);
-
+        
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMemberService.class)
+            .setRequired(true));
         
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMessageService.class)
             .setRequired(true));
         return component;

Modified: 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
        Tue Dec 21 18:33:02 2010
@@ -130,6 +130,20 @@
                 "unknown_intent" });
         Assert.assertFalse("Unknown intent specified... cannot be supported",
             DistributionUtilities.isExportedIntentsListSupported(props));
+
+        props = new Hashtable<String, Object>();
+        props.put(DistributionService.SERVICE_INTENTS_PROP, new String[] {});
+        props.put(DistributionService.SERVICE_EXPORTED_INTENTS_PROP,
+            new String[] { 
DistributionService.DISTRIBUTION_INTENT_CONFIDENTIALITY });
+        props.put(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP, new 
String[] {});
+        props.put(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP,
+            new String[] { RemotableInterface1.class.getName() });
+        props.put(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP,
+            new String[] { DistributionService.SERVICE_CONFIGURATION_TYPE });
+        props.put("org.amdatu.test.prop", "this property should be published 
on imported services");
+        Assert.assertTrue("Unknown intent specified... cannot be supported",
+            DistributionUtilities.isExportedIntentsListSupported(props));
+
     }
 
     @Test

Reply via email to