Author: bdekruijff at gmail.com
Date: Fri Dec 31 17:13:10 2010
New Revision: 553

Log:
[sandbox] Major fabric refactor changing to whiteboard/eventadmin model

Added:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
      - copied, changed from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
      - copied, changed from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
      - copied, changed from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
   sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Removed:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageListener.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterTopicListener.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/TopicMessageWrapper.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
Modified:
   sandbox/bdekruijff/fabric/pom.xml
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java

Modified: sandbox/bdekruijff/fabric/pom.xml
==============================================================================
--- sandbox/bdekruijff/fabric/pom.xml   (original)
+++ sandbox/bdekruijff/fabric/pom.xml   Fri Dec 31 17:13:10 2010
@@ -36,7 +36,7 @@
             
<Bundle-Activator>org.amdatu.core.fabric.osgi.Activator</Bundle-Activator>
             <Bundle-SymbolicName>org.amdatu.core.fabric</Bundle-SymbolicName>
             <DynamicImport-Package>*</DynamicImport-Package>
-            <Export-Package>org.amdatu.core.fabric.cluster, 
org.amdatu.core.fabric.remote</Export-Package>
+            <Export-Package>org.amdatu.core.fabric, 
org.amdatu.core.fabric.cluster, org.amdatu.core.fabric.remote</Export-Package>
             <Embed-Dependency>*;scope=compile</Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
           </instructions>

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
    Fri Dec 31 17:13:10 2010
@@ -0,0 +1,21 @@
+package org.amdatu.core.fabric;
+
+import java.util.Dictionary;
+
+public interface FabricManagerService {
+
+    String CONFIGURATION_PID = "org.amdatu.core.fabric";
+
+    boolean createClusterMember(String clusterGroupId, String clusterMemberId, 
Dictionary<String, Object> properties);
+
+    boolean removeClusterMember(String clusterGroupId, String clusterMemberId);
+
+    boolean createDiscovery(String clusterGroupId, String serviceGroupId);
+
+    boolean removeDiscovery(String clusterGroupId, String serviceGroupId);
+
+    boolean createDistribution(String clusterGroupId, String serviceGroupId);
+
+    boolean removeDistribution(String clusterGroupId, String serviceGroupId);
+
+}

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
   Fri Dec 31 17:13:10 2010
@@ -18,5 +18,5 @@
 
 public interface ClusterMember {
 
-    String getId();
+    String getMemberId();
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    Fri Dec 31 17:13:10 2010
@@ -16,28 +16,11 @@
  */
 package org.amdatu.core.fabric.cluster;
 
-import java.util.Dictionary;
-
 public interface ClusterMemberService {
 
-    String CLUSTER_CLUSTERID_PROP = "org.amdatu.fabric.cluster.CLUSTERID";
-    String CLUSTER_MEMBERID_PROP = "org.amdatu.fabric.cluster.MEMBERID";
-
-    String getClusterId();
-
-    String getMemberId();
-
-    Dictionary<String, Object> getProperties();
-
-    ClusterMember[] getClusterMembers();
-
-    ClusterMember getClusterMember(String memberId);
-
-    void broadcast(Object message);
-
-    void send(ClusterMember[] clusterMembers, Object message);
-
-    void subscribe(ClusterMessageListener clusterMessageListener);
+    String CLUSTER_CLUSTERGROUP_PROP = 
"org.amdatu.fabric.cluster.CLUSTERGROUP";
+    String CLUSTER_CLUSTERMEMBER_PROP = 
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
 
-    void unsubscribe(ClusterMessageListener clusterMessageListener);
+    String EVENT_TOPIC_BROADCAST = "org/amdatu/fabric/cluster/BROADCAST";
+    String EVENT_MESSAGE_PROPERTY = "org/amdatu/fabric/cluster/MESSAGE";
 }

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
       Fri Dec 31 17:13:10 2010
@@ -0,0 +1,25 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster;
+
+/**
+ * Interface for messages that want to specify the EventAdmin topic that the
+ * ClusterMemberService should post them on at the receiver end.
+ */
+public interface LocalTopicMessage {
+    String getLocalTopic();
+}

Copied: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 (from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java)
==============================================================================
--- 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 Fri Dec 31 17:13:10 2010
@@ -1,27 +1,83 @@
-package org.amdatu.core.fabric.cluster.internal;
+package org.amdatu.core.fabric.cluster;
 
 import java.io.Serializable;
 
-public abstract class RoutedMessage implements Serializable {
-
-    private final String m_targetClusterId;
-    private final String[] m_targetMemberIds;
+/**
+ * Base class for messages that want to enable routing support. Target 
information may
+ * be set to avoid a cluster wide broadcast. The origin information will be 
injected by
+ * the ClusterMemberService upon broadcast/send.
+ * 
+ * FIXME ServiceGroup is not a cluster concept
+ */
+public abstract class RoutableMessage implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String m_originClusterId;
+    private String m_originMemberId;
+    private String m_originServiceGroup;
+
+    private String m_targetClusterId;
+    private String m_targetMemberId;
+    private String m_targetServiceGroup;
+
+    public RoutableMessage(final String serviceGroup) {
+        m_originServiceGroup = serviceGroup;
+        m_targetServiceGroup = serviceGroup;
+    }
 
-    public RoutedMessage(final String clusterId, final String memberId) {
+    public RoutableMessage(final String clusterId, final String memberId, 
final String serviceGroup) {
         m_targetClusterId = clusterId;
-        m_targetMemberIds = new String[] { memberId };
+        m_targetServiceGroup = serviceGroup;
+        m_targetMemberId = memberId;
     }
 
-    public RoutedMessage(final String clusterId, String[] memberIds) {
-        m_targetClusterId = clusterId;
-        m_targetMemberIds = memberIds;
+    public final String getOriginClusterId() {
+        return m_originClusterId;
+    }
+
+    public final void setOriginClusterId(String clusterId) {
+        m_originClusterId = clusterId;
+    }
+
+    public final String getOriginMemberId() {
+        return m_originMemberId;
+    }
+
+    public final void setOriginMemberId(String memberId) {
+        m_originMemberId = memberId;
     }
 
-    public final String getClusterId() {
+    public final String getOriginServiceGroup() {
+        return m_originServiceGroup;
+    }
+
+    public final void setOriginServcieGroup(String serviceGroup) {
+        m_originServiceGroup = serviceGroup;
+    }
+
+    public final String getTargetClusterId() {
         return m_targetClusterId;
     }
 
-    public final String[] getMemberIds() {
-        return m_targetMemberIds;
+    public final void setTargetClusterId(String clusterId) {
+        m_targetClusterId = clusterId;
+    }
+
+    public final String getTargetMemberId() {
+        return m_targetMemberId;
+    }
+
+    public final void setTargetMemberId(String memberId) {
+        m_targetMemberId = memberId;
     }
+
+    public final String getTargetServiceGroup() {
+        return m_targetServiceGroup;
+    }
+
+    public final void setTargetServiceGroup(String serviceGroup) {
+        m_targetServiceGroup = serviceGroup;
+    }
+
 }
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
      (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
      Fri Dec 31 17:13:10 2010
@@ -20,22 +20,22 @@
 
 public class ClusterMemberImpl implements ClusterMember {
 
-    public final String m_id;
+    public final String m_memberId;
 
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public ClusterMemberImpl(final String id) {
-        m_id = id;
+    public ClusterMemberImpl(final String memberId) {
+        m_memberId = memberId;
     }
 
     /********************************************************
      * ClusterMember
      ********************************************************/
 
-    public String getId() {
-        return m_id;
+    public String getMemberId() {
+        return m_memberId;
     }
 
     /********************************************************
@@ -44,11 +44,11 @@
 
     @Override
     public boolean equals(Object obj) {
-        return m_id.equals(((ClusterMemberImpl) obj).getId());
+        return m_memberId.equals(((ClusterMemberImpl) obj).getMemberId());
     }
 
     @Override
     public int hashCode() {
-        return m_id.hashCode();
+        return m_memberId.hashCode();
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
  Fri Dec 31 17:13:10 2010
@@ -62,6 +62,10 @@
     }
 
     public static Channel createChannel(String[] args) throws Exception {
+
+        // amdatu
+        byte[] domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
         String bind = "auto";
         int port = 4001;
         String mbind = null;
@@ -169,6 +173,12 @@
             else if ("-mbind".equals(args[i])) {
                 mbind = args[++i];
             }
+            else if ("-domain".equals(args[i])) {
+                String dom = args[++i];
+                if (dom.equals("amdatu")) {
+                    domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 1 };
+                }
+            }
         }
 
         System.out.println("Creating receiver class=" + receiver);
@@ -248,7 +258,6 @@
             channel.addInterceptor(smi);
         }
 
-        byte[] domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
         ((McastService) channel.getMembershipService()).setDomain(domain);
         DomainFilterInterceptor filter = new DomainFilterInterceptor();
         filter.setDomain(domain);

Copied: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
 (from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java)
==============================================================================
--- 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
        Fri Dec 31 17:13:10 2010
@@ -19,42 +19,51 @@
 import java.util.Dictionary;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageListener;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.osgi.service.log.LogService;
 
 /**
  * I manage cluster state
  */
-public abstract class AbstractClusterMemberService implements 
ClusterMemberService {
+public abstract class BaseClusterMemberService implements ClusterMemberService 
{
 
     private final Map<String, ClusterMember> m_clusterMembers = new 
HashMap<String, ClusterMember>();
-    private final Set<ClusterMessageListener> m_clusterMessageListeners = new 
HashSet<ClusterMessageListener>();
-
     private final ReentrantReadWriteLock m_clusterMembersLock = new 
ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock m_clusterMessageListenersLock = new 
ReentrantReadWriteLock();
-
-    private final ExecutorService m_executorService = 
Executors.newFixedThreadPool(1);
 
     private final String m_clusterId;
     private final String m_memberId;
     private final Map<String, Object> m_properties;
 
+    // injected
+    private volatile DependencyManager m_dependencyManager;
+    private volatile Component m_component;
+    private volatile EventAdmin m_eventAdmin;
+    private volatile LogService m_logService;
+
+    private volatile Component m_broadcastEventHandlerComponent;
+
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public AbstractClusterMemberService(String clusterId, String memberId, 
Dictionary<String, Object> properties) {
-        m_clusterId = clusterId;
-        m_memberId = memberId;
+    public BaseClusterMemberService(String clusterGroupId, String 
clusterMemberId,
+        Dictionary<String, Object> properties) {
+        m_clusterId = clusterGroupId;
+        m_memberId = clusterMemberId;
         m_properties = new HashMap<String, Object>();
         if (properties != null) {
             Enumeration<String> enumeration = properties.keys();
@@ -66,6 +75,54 @@
     }
 
     /********************************************************
+     * Service lifecycle
+     ********************************************************/
+
+    public final synchronized void init() {
+        @SuppressWarnings("unchecked")
+        Dictionary<String, Object> serviceProps = 
m_component.getServiceProperties();
+        if (serviceProps == null) {
+            serviceProps = new Hashtable<String, Object>();
+        }
+        serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterId);
+        serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP, 
m_memberId);
+        m_component.setServiceProperties(serviceProps);
+
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        m_component.add(logServiceDependency);
+
+        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
+        eventAdminServiceDependency.setService(EventAdmin.class);
+        eventAdminServiceDependency.setRequired(true);
+        m_component.add(eventAdminServiceDependency);
+
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventConstants.EVENT_TOPIC, new String[] { 
EVENT_TOPIC_BROADCAST + "/" + m_clusterId });
+        m_broadcastEventHandlerComponent = 
m_dependencyManager.createComponent();
+        
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(), 
props);
+        m_broadcastEventHandlerComponent.setImplementation(new 
BroadcastEventHandler());
+        onInit();
+    }
+
+    public final synchronized void destroy() {
+        onDestroy();
+    }
+
+    public final synchronized void start() {
+        m_logService.log(LogService.LOG_WARNING, "Starting 
ClusterMemberService");
+        m_dependencyManager.add(m_broadcastEventHandlerComponent);
+        onStart();
+    }
+
+    public final synchronized void stop() {
+        m_logService.log(LogService.LOG_WARNING, "Stopping 
ClusterMemberService");
+        m_dependencyManager.remove(m_broadcastEventHandlerComponent);
+        onStop();
+    }
+
+    /********************************************************
      * ClusterMemberService
      ********************************************************/
 
@@ -102,44 +159,18 @@
         }
     }
 
-    public final void broadcast(Object message) {
-        m_executorService.submit(new BroadcastRunnable(message));
-    }
-
-    public final void send(ClusterMember[] clusterMembers, Object message) {
-        m_executorService.submit(new SendRunnable(clusterMembers, message));
-    }
-
-    public final void subscribe(ClusterMessageListener clusterMessageListener) 
{
-        m_clusterMessageListenersLock.writeLock().lock();
-        try {
-            m_clusterMessageListeners.add(clusterMessageListener);
-        }
-        finally {
-            m_clusterMessageListenersLock.writeLock().unlock();
-        }
-        onSubscribe(clusterMessageListener);
-    }
-
-    public final void unsubscribe(ClusterMessageListener 
clusterMessageListener) {
-        m_clusterMessageListenersLock.writeLock().lock();
-        try {
-            m_clusterMessageListeners.remove(clusterMessageListener);
-        }
-        finally {
-            m_clusterMessageListenersLock.writeLock().unlock();
-        }
-        onUnsubscribe(clusterMessageListener);
-    }
-
     /********************************************************
      * for implementing concrete classes
      ********************************************************/
 
+    protected final LogService getLogService() {
+        return m_logService;
+    }
+
     protected final void addClusterMember(ClusterMember clusterMember) {
         m_clusterMembersLock.writeLock().lock();
         try {
-            m_clusterMembers.put(clusterMember.getId(), clusterMember);
+            m_clusterMembers.put(clusterMember.getMemberId(), clusterMember);
         }
         finally {
             m_clusterMembersLock.writeLock().unlock();
@@ -149,7 +180,7 @@
     protected final void removeClusterMember(ClusterMember clusterMember) {
         m_clusterMembersLock.writeLock().lock();
         try {
-            m_clusterMembers.remove(clusterMember.getId());
+            m_clusterMembers.remove(clusterMember.getMemberId());
         }
         finally {
             m_clusterMembersLock.writeLock().unlock();
@@ -157,71 +188,74 @@
     }
 
     protected final void dispatchMessage(Object message) {
-        m_clusterMessageListenersLock.readLock().lock();
-        try {
-            for (ClusterMessageListener clm : m_clusterMessageListeners) {
-                m_executorService.submit(new MessageDispatchRunnable(clm, 
message));
-            }
-        }
-        finally {
-            m_clusterMessageListenersLock.readLock().unlock();
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EVENT_MESSAGE_PROPERTY, message);
+        // FIXME DO NOT REBROADCAST
+        String topic = EVENT_TOPIC_BROADCAST;
+        if (message instanceof LocalTopicMessage) {
+            topic = ((LocalTopicMessage) message).getLocalTopic();
         }
+        Event broadCastEvent = new Event(topic, props);
+        m_eventAdmin.postEvent(broadCastEvent);
     }
 
-    protected void onSubscribe(ClusterMessageListener clusterMessageListener) {
-    }
+    protected abstract void onInit();
 
-    protected void onUnsubscribe(ClusterMessageListener 
clusterMessageListener) {
-    }
+    protected abstract void onDestroy();
+
+    protected abstract void onStart();
 
-    public abstract void doBroadcast(Object message);
+    protected abstract void onStop();
 
-    public abstract void doSend(ClusterMember[] clusterMember, Object message);
+    protected abstract void doBroadcast(Object message);
+
+    protected abstract void doSend(ClusterMember[] clusterMember, Object 
message);
 
     /********************************************************
      * helper classes
      ********************************************************/
 
-    class BroadcastRunnable implements Runnable {
-
-        private final Object m_message;
-
-        public BroadcastRunnable(final Object message) {
-            m_message = message;
-        }
-
-        public void run() {
-            doBroadcast(m_message);
-        }
-    }
+    class BroadcastEventHandler implements EventHandler {
 
-    class SendRunnable implements Runnable {
-
-        private final ClusterMember[] m_clusterMember;
-        private final Object m_message;
-
-        public SendRunnable(final ClusterMember[] clusterMembers, final Object 
message) {
-            m_clusterMember = clusterMembers;
-            m_message = message;
-        }
-
-        public void run() {
-            doSend(m_clusterMember, m_message);
-        }
-    }
-
-    static class MessageDispatchRunnable implements Runnable {
-
-        private final ClusterMessageListener m_clusterMessageListener;
-        private final Object m_message;
-
-        public MessageDispatchRunnable(final ClusterMessageListener 
clusterMessageListener, final Object message) {
-            m_clusterMessageListener = clusterMessageListener;
-            m_message = message;
-        }
-
-        public void run() {
-            m_clusterMessageListener.recieveMessage(m_message);
+        public void handleEvent(Event event) {
+            Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
+            if (message instanceof RoutableMessage) {
+
+                RoutableMessage routableMessage = (RoutableMessage) message;
+                routableMessage.setOriginClusterId(m_clusterId);
+                routableMessage.setOriginMemberId(m_memberId);
+
+                if (routableMessage.getTargetClusterId() == null) {
+                    routableMessage.setTargetClusterId(m_clusterId);
+                }
+                else {
+                    // FIXME address this
+                    if 
(!routableMessage.getTargetClusterId().equals(getClusterId())) {
+                        m_logService.log(LogService.LOG_ERROR, 
RoutableMessage.class.getSimpleName()
+                                + " is not for this cluster: " + 
routableMessage.getTargetClusterId());
+                    }
+                }
+
+                ClusterMember clusterMember = null;
+                if (routableMessage.getTargetMemberId() != null) {
+                    clusterMember = 
getClusterMember(routableMessage.getTargetMemberId());
+                    if (clusterMember == null) {
+                        // FIXME address this
+                        m_logService.log(LogService.LOG_ERROR, "RoutedMessage 
specifies unknown target member: "
+                            + routableMessage.getTargetMemberId());
+                    }
+                }
+
+                if (clusterMember != null) {
+                    doSend(new ClusterMember[] { clusterMember }, message);
+                }
+                else {
+                    doBroadcast(message);
+                }
+            }
+            else {
+                doBroadcast(message);
+            }
         }
     }
 }

Copied: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
 (from r534, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java)
==============================================================================
--- 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   Fri Dec 31 17:13:10 2010
@@ -31,93 +31,110 @@
 import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
 import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
-import org.amdatu.core.fabric.cluster.service.AbstractClusterMemberService;
+import org.amdatu.core.fabric.cluster.service.BaseClusterMemberService;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.osgi.service.log.LogService;
 
-public final class ClusterMemberServiceImpl extends 
AbstractClusterMemberService implements MembershipListener,
-    ChannelListener {
+public final class TribesClusterMemberServiceImpl extends 
BaseClusterMemberService {
 
     public static final String CLUSTER_TRIBES_ARGS_PROP = 
"org.amdatu.fabric.cluster.tribes.args";
 
     private final Map<ClusterMember, Member> m_clusterMemberMembers = new 
HashMap<ClusterMember, Member>();
-
     private final ReentrantReadWriteLock m_clusterMemberMembersLock = new 
ReentrantReadWriteLock();
 
-    private ManagedChannel m_managedChannel;
+    private volatile ManagedChannel m_managedChannel;
 
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public ClusterMemberServiceImpl(String clusterId, String memberId, 
Dictionary<String, Object> properties) {
-        super(clusterId, memberId, properties);
+    public TribesClusterMemberServiceImpl(String clusterGroupId, String 
clusterMemberId,
+        Dictionary<String, Object> properties) {
+        super(clusterGroupId, clusterMemberId, properties);
     }
 
     /********************************************************
      * Service lifecycle
      ********************************************************/
 
-    public void start() {
+    protected synchronized void onInit() {
+    }
+
+    protected synchronized void onDestroy() {
+    }
+
+    protected synchronized void onStart() {
         try {
+            getLogService().log(LogService.LOG_DEBUG, "Starting managed 
channel");
             m_managedChannel =
                 (ManagedChannel) ChannelCreator.createChannel((String[]) 
getProperties().get(
                     CLUSTER_TRIBES_ARGS_PROP));
 
             Properties props = new Properties();
-            props.setProperty(CLUSTER_MEMBERID_PROP, getMemberId());
-
-            m_managedChannel.addMembershipListener(this);
-            m_managedChannel.addChannelListener(this);
+            props.setProperty(CLUSTER_CLUSTERMEMBER_PROP, getMemberId());
+            m_managedChannel.addMembershipListener(new 
TribesMembershipListener());
+            m_managedChannel.addChannelListener(new TribesChannelListener());
             
m_managedChannel.getMembershipService().setPayload(getPayload(props));
-
             m_managedChannel.start(Channel.DEFAULT);
         }
         catch (Exception e) {
-            e.printStackTrace();
+            getLogService().log(LogService.LOG_ERROR, "Exception while 
starting managed channel", e);
         }
     }
 
-    public void stop() {
+    protected synchronized void onStop() {
         try {
+            getLogService().log(LogService.LOG_DEBUG, "Stopping managed 
channel");
             m_managedChannel.stop(Channel.DEFAULT);
         }
-        catch (Exception x) {
-            x.printStackTrace();
+        catch (Exception e) {
+            getLogService().log(LogService.LOG_ERROR, "Exception while 
stopping managed channel", e);
         }
     }
 
     /********************************************************
-     * ClusterMemberService
+     * ClusterMemberService interface
      ********************************************************/
 
     @Override
     public void doBroadcast(Object message) {
-        // TODO check and wrap message. Look into send options
         if (message instanceof Serializable) {
             try {
                 Member[] members = m_managedChannel.getMembers();
-                if (members.length > 0)
+                if (members.length > 0) {
                     m_managedChannel.send(members, (Serializable) message,
                         Channel.SEND_OPTIONS_ASYNCHRONOUS);
+                }
+                else {
+                    getLogService().log(
+                        LogService.LOG_WARNING,
+                        "Dropping message during broadcast because there are 
no members on my channel: "
+                            + message.toString());
+                }
             }
             catch (ChannelException e) {
-                e.printStackTrace();
+                getLogService().log(LogService.LOG_ERROR, "Exception during 
send on managed channel", e);
             }
         }
+        else {
+            getLogService().log(LogService.LOG_ERROR,
+                "Dropping message of type " + message.getClass().getName() + " 
because it is not Serializable: "
+                    + message.toString());
+        }
     }
 
     @Override
     public void doSend(ClusterMember[] clusterMembers, Object message) {
-        // TODO check and wrap message. Look into send options
         if (message instanceof Serializable) {
             try {
                 List<Member> members = new LinkedList<Member>();
-                synchronized (m_clusterMemberMembers) {
+                m_clusterMemberMembersLock.readLock().lock();
+                try {
                     for (ClusterMember clusterMember : clusterMembers) {
                         Member member = 
m_clusterMemberMembers.get(clusterMember);
                         if (member != null) {
@@ -125,93 +142,106 @@
                         }
                     }
                 }
-                if (members.size() > 0)
+                finally {
+                    m_clusterMemberMembersLock.readLock().unlock();
+                }
+                if (members.size() > 0) {
                     m_managedChannel.send(members.toArray(new 
Member[members.size()]), (Serializable) message,
                         Channel.SEND_OPTIONS_ASYNCHRONOUS);
+                }
+                else {
+                    getLogService().log(LogService.LOG_WARNING,
+                        "Dropping message during send because there are no 
matching members on my channel: "
+                            + message.toString());
+                }
             }
             catch (ChannelException e) {
-                e.printStackTrace();
+                getLogService().log(LogService.LOG_ERROR, "Exception during 
send on managed channel", e);
             }
         }
     }
 
     /********************************************************
-     * MembershipListener
+     * Utility methods
      ********************************************************/
 
-    public void memberAdded(Member member) {
-        try {
-            ClusterMember clusterMember = new 
ClusterMemberImpl(getProperties(member.getPayload())
-                .getProperty(CLUSTER_MEMBERID_PROP));
-            m_clusterMemberMembersLock.writeLock().lock();
+    private byte[] getPayload(Properties props) throws IOException {
+        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        props.store(bout, "");
+        return bout.toByteArray();
+    }
+
+    private Properties getProperties(byte[] payload) throws IOException {
+        ByteArrayInputStream bin = new ByteArrayInputStream(payload);
+        Properties props = new Properties();
+        props.load(bin);
+        return props;
+    }
+
+    /********************************************************
+     * Helper classes
+     ********************************************************/
+
+    class TribesMembershipListener implements MembershipListener {
+
+        public void memberAdded(Member member) {
             try {
-                m_clusterMemberMembers.put(clusterMember, member);
+                getLogService().log(LogService.LOG_DEBUG, "Member added: " + 
member.toString());
+                ClusterMember clusterMember = new 
ClusterMemberImpl(getProperties(member.getPayload())
+                    .getProperty(CLUSTER_CLUSTERMEMBER_PROP));
+                m_clusterMemberMembersLock.writeLock().lock();
+                try {
+                    m_clusterMemberMembers.put(clusterMember, member);
+                    addClusterMember(clusterMember);
+                }
+                finally {
+                    m_clusterMemberMembersLock.writeLock().unlock();
+                }
             }
-            finally {
-                m_clusterMemberMembersLock.writeLock().unlock();
+            catch (Exception e) {
+                getLogService().log(LogService.LOG_ERROR, "Exception while 
adding member: " + member.toString(), e);
             }
-            addClusterMember(clusterMember);
-        }
-        catch (Exception x) {
-            x.printStackTrace();
         }
-    }
 
-    public void memberDisappeared(Member member) {
-        try {
-            // FIXME use memberid to tuple map to reduce object creation
-            String memberId = getProperties(member.getPayload())
-                .getProperty(CLUSTER_MEMBERID_PROP);
-            ClusterMember toBeRemoved = null;
-            m_clusterMemberMembersLock.writeLock().lock();
+        public void memberDisappeared(Member member) {
             try {
-                for (ClusterMember clusterMember : 
m_clusterMemberMembers.keySet()) {
-                    if (clusterMember.getId().equals(memberId)) {
-                        toBeRemoved = clusterMember;
-                        break;
+                // FIXME use memberid to tuple map to reduce object creation
+                getLogService().log(LogService.LOG_DEBUG, "Member disappeared: 
" + member.toString());
+                String memberId = getProperties(member.getPayload())
+                    .getProperty(CLUSTER_CLUSTERMEMBER_PROP);
+                ClusterMember toBeRemoved = null;
+                m_clusterMemberMembersLock.writeLock().lock();
+                try {
+                    for (ClusterMember clusterMember : 
m_clusterMemberMembers.keySet()) {
+                        if (clusterMember.getMemberId().equals(memberId)) {
+                            toBeRemoved = clusterMember;
+                            break;
+                        }
+                    }
+                    if (toBeRemoved != null) {
+                        removeClusterMember(toBeRemoved);
+                        m_clusterMemberMembers.remove(toBeRemoved);
                     }
                 }
-                if (toBeRemoved != null) {
-                    m_clusterMemberMembers.remove(toBeRemoved);
+                finally {
+                    m_clusterMemberMembersLock.writeLock().unlock();
                 }
             }
-            finally {
-                m_clusterMemberMembersLock.writeLock().unlock();
+            catch (Exception e) {
+                getLogService().log(LogService.LOG_ERROR, "Exception while 
removing member: " + member.toString(), e);
             }
-            removeClusterMember(toBeRemoved);
         }
-        catch (Exception x) {
-            x.printStackTrace();
-        }
-    }
-
-    /********************************************************
-     * ChannelListener
-     ********************************************************/
-
-    public boolean accept(Serializable message, Member arg1) {
-        return true;
-    }
-
-    public void messageReceived(Serializable message, Member member) {
-        dispatchMessage(message);
     }
 
-    /********************************************************
-     * private
-     ********************************************************/
+    class TribesChannelListener implements ChannelListener {
 
-    private byte[] getPayload(Properties props) throws IOException {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        props.store(bout, "");
-        return bout.toByteArray();
-    }
+        public boolean accept(Serializable message, Member member) {
+            return true;
+        }
 
-    private Properties getProperties(byte[] payload) throws IOException {
-        ByteArrayInputStream bin = new ByteArrayInputStream(payload);
-        Properties props = new Properties();
-        props.load(bin);
-        return props;
+        public void messageReceived(Serializable message, Member member) {
+            dispatchMessage(message);
+        }
     }
 
     static class ClusterMemberMemberTuple {

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  Fri Dec 31 17:13:10 2010
@@ -17,98 +17,21 @@
  */
 package org.amdatu.core.fabric.osgi;
 
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.UUID;
-
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.service.ClusterMessageServiceImpl;
-import org.amdatu.core.fabric.cluster.service.tribes.ClusterMemberServiceImpl;
-import org.amdatu.core.fabric.remote.DiscoveryService;
-import org.amdatu.core.fabric.remote.DistributionService;
-import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
-import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
-import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
-import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.amdatu.core.fabric.FabricManagerService;
+import org.amdatu.core.fabric.service.FabricManagerServiceImpl;
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
 import org.osgi.framework.BundleContext;
-import org.osgi.service.log.LogService;
 
 public class Activator extends DependencyActivatorBase {
 
     @Override
     public void init(BundleContext context, DependencyManager manager) throws 
Exception {
 
-        // Generic props
-
-        String memberid = UUID.randomUUID().toString();
-        Dictionary<String, Object> cm1props = new Hashtable<String, Object>();
-        cm1props.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, "CLUSTER1");
-        cm1props.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, memberid);
-        cm1props
-            .put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new 
String[] { "-port", "8880", "-throughput" });
-
-        // ClusterMemberService
-
-        manager.add(
-            createComponent()
-                .setImplementation(new ClusterMemberServiceImpl("CLUSTER1", 
memberid, cm1props))
-                .setInterface(ClusterMemberService.class.getName(), cm1props)
-                
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
-        // ClusterMessageService
-
-        manager.add(
-            createComponent()
-                .setImplementation(new ClusterMessageServiceImpl())
-                .setInterface(ClusterMessageService.class.getName(), cm1props)
-                .add(
-                    
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
-                
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
-        // DiscoveryService
-
-        manager.add(
-            createComponent()
-                .setImplementation(new DiscoveryServiceImpl())
-                .setInterface(DiscoveryService.class.getName(), cm1props)
-                .add(
-                    
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
-                .add(
-                    
createServiceDependency().setService(ClusterMessageService.class).setRequired(true))
-                .add(
-                    
createServiceDependency().setService(RemotableServiceEndpoint.class)
-                        .setCallbacks("remotableServiceEndPointAdded", 
"remotableServiceEndPointRemoved")
-                        .setRequired(false))
-                
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
-        // DistributionService
-
-        Dictionary<String, Object> distributionProps = new Hashtable<String, 
Object>();
-
-        manager.add(
-            createComponent()
-                .setImplementation(new DistributionServiceImpl())
-                .setInterface(DistributionService.class.getName(), 
distributionProps)
-                .add(
-                    
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
-                .add(
-                    
createServiceDependency().setService(ClusterMessageService.class).setRequired(true))
-                .add(
-                    createServiceDependency()
-                        .setService(
-                            "("
-                                + 
DistributionService.SERVICE_EXPORTED_CONFIGS_PROP + "="
-                                + 
DistributionService.SERVICE_CONFIGURATION_TYPE + ")")
-                        .setCallbacks("localRemotableServiceAdded", 
"localRemotableServiceRemoved")
-                        .setRequired(false))
-                .add(
-                    
createServiceDependency().setService(RemoteServiceEndPoint.class)
-                        .setCallbacks("remoteServiceEndPointAdded", 
"remoteServiceEndPointRemoved")
-                        .setRequired(false))
-                
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
+        manager.add(createComponent()
+            .setInterface(FabricManagerService.class.getName(), null)
+            .setImplementation(new FabricManagerServiceImpl())
+            
.add(createConfigurationDependency().setPid(FabricManagerService.CONFIGURATION_PID)));
     }
 
     @Override

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
 Fri Dec 31 17:13:10 2010
@@ -18,9 +18,6 @@
 
 public interface DiscoveryService {
 
-    String DISCOVERY_TOPIC = "org.amdatu.fabric.remote.DISCOVERY";
-
-    ServiceEndPoint[] getLocalServiceEndPoints();
-
-    ServiceEndPoint[] getRemoteServiceEndPoints();
+    String REMOTE_SERVICEGROUPID_PROP = 
"org.amdatu.fabric.remote.SERVICEGROUP";
+    String EVENT_TOPIC_DISCOVERY = "org/amdatu/fabric/remote/DISCOVERY";
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
      (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
      Fri Dec 31 17:13:10 2010
@@ -19,6 +19,9 @@
 public interface DistributionService {
 
     String REMOTE_TOPIC = "org.amdatu.fabric.remote.REMOTE";
+    String EVENT_TOPIC_REMOTE = "org/amdatu/fabric/remote/REMOTE";
+    String EVENT_TOPIC_INVOKE = "org/amdatu/fabric/remote/INVOKE";
+    String EVENT_TOPIC_RESPONSE = "org/amdatu/fabric/remote/RESPONSE";
 
     // see OSGi R42 spec 13.5.1
     // FIXME we are not actually doing anything to support these intents

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
    Fri Dec 31 17:13:10 2010
@@ -19,5 +19,4 @@
 public interface RemoteServiceEndPoint {
 
     ServiceEndPoint getServiceEndPoint();
-
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  Fri Dec 31 17:13:10 2010
@@ -20,10 +20,13 @@
 import java.util.Enumeration;
 import java.util.Hashtable;
 
-public class ServiceEndPoint implements Serializable {
+public final class ServiceEndPoint implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private String m_clusterId;
     private String m_memberId;
+    private String m_serviceGroup;
     private String[] m_objectClass;
     private long m_originalServiceId;
     private Hashtable<String, Object> m_properties;
@@ -47,6 +50,14 @@
         m_memberId = memberId;
     }
 
+    public String getServiceGroup() {
+        return m_serviceGroup;
+    }
+
+    public void setServiceGroup(String serviceGroup) {
+        m_serviceGroup = serviceGroup;
+    }
+
     public String[] getObjectClass() {
         return m_objectClass;
     }
@@ -74,9 +85,21 @@
     @Override
     public boolean equals(Object obj) {
         ServiceEndPoint other = (ServiceEndPoint) obj;
-        if (!getClusterId().equals(other.getClusterId()))
+        if ((getServiceGroup() == null && other.getServiceGroup() != null)
+                        || (getServiceGroup() != null && 
other.getServiceGroup() == null))
+            return false;
+        if ((getClusterId() == null && other.getClusterId() != null)
+                        || (getClusterId() != null && other.getClusterId() == 
null))
+            return false;
+        if ((getMemberId() == null && other.getMemberId() != null)
+                        || (getMemberId() != null && other.getMemberId() == 
null))
+            return false;
+        if (!(getServiceGroup() == null && other.getServiceGroup() == null) && 
!getServiceGroup().equals(
+            other.getServiceGroup()))
+            return false;
+        if (!(getClusterId() == null && other.getClusterId() == null) && 
!getClusterId().equals(other.getClusterId()))
             return false;
-        if (!getMemberId().equals(other.getMemberId()))
+        if (!(getMemberId() == null && other.getMemberId() == null) && 
!getMemberId().equals(other.getMemberId()))
             return false;
         if (getOriginalServiceId() != other.getOriginalServiceId())
             return false;
@@ -85,7 +108,15 @@
 
     @Override
     public int hashCode() {
-        return (getClusterId() + getMemberId() + 
getOriginalServiceId()).hashCode();
+        String key = "";
+        if (getServiceGroup() != null)
+            key += getServiceGroup();
+        if (getClusterId() != null)
+            key += getClusterId();
+        if (getMemberId() != null)
+            key += getMemberId();
+        key += getOriginalServiceId();
+        return key.hashCode();
     }
 
     @Override

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
      Fri Dec 31 17:13:10 2010
@@ -0,0 +1,10 @@
+package org.amdatu.core.fabric.remote.internal;
+
+import org.amdatu.core.fabric.remote.DiscoveryService;
+
+public final class DiscoveryUtilities {
+
+    public static String getLocalDiscoveryTopic(final String clusterGroup, 
final String serviceGroup) {
+        return DiscoveryService.EVENT_TOPIC_DISCOVERY + "/" + clusterGroup + 
"/" + serviceGroup;
+    }
+}

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
   Fri Dec 31 17:13:10 2010
@@ -26,6 +26,14 @@
 
 public final class DistributionUtilities {
 
+    public static String getLocalInvokeTopic(final String clusterGroup, final 
String serviceGroup) {
+        return DistributionService.EVENT_TOPIC_INVOKE + "/" + clusterGroup + 
"/" + serviceGroup;
+    }
+
+    public static String getLocalResponseTopic(final String clusterGroup, 
final String serviceGroup) {
+        return DistributionService.EVENT_TOPIC_RESPONSE + "/" + clusterGroup + 
"/" + serviceGroup;
+    }
+
     public static boolean isConfigurationTypeSupported(Dictionary<String, 
Object> serviceRegistrationProperties) {
         if (serviceRegistrationProperties == null) {
             return false;

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
        Fri Dec 31 17:13:10 2010
@@ -18,17 +18,29 @@
 
 import java.io.Serializable;
 
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointDepublishMessage implements Serializable {
+public class EndpointDepublishMessage extends RoutableMessage implements 
LocalTopicMessage, Serializable {
 
-    private ServiceEndPoint m_serviceEndPoint;
+    private static final long serialVersionUID = 1L;
+
+    private final ServiceEndPoint m_serviceEndPoint;
 
     public EndpointDepublishMessage(ServiceEndPoint serviceEndPoint) {
+        super(serviceEndPoint.getServiceGroup());
         m_serviceEndPoint = serviceEndPoint;
     }
 
     public ServiceEndPoint getServiceEndPoint() {
+        m_serviceEndPoint.setClusterId(getOriginClusterId());
+        m_serviceEndPoint.setMemberId(getOriginMemberId());
         return m_serviceEndPoint;
     }
+
+    public String getLocalTopic() {
+        return 
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
+            m_serviceEndPoint.getServiceGroup());
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
        Fri Dec 31 17:13:10 2010
@@ -18,8 +18,22 @@
 
 import java.io.Serializable;
 
-public class EndpointDiscoveryMessage implements Serializable {
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 
-    public EndpointDiscoveryMessage() {
+public class EndpointDiscoveryMessage implements LocalTopicMessage, 
Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String m_clusterId;
+    private final String m_serviceGroup;
+
+    public EndpointDiscoveryMessage(String clusterId, String serviceGroup) {
+        m_clusterId = clusterId;
+        m_serviceGroup = serviceGroup;
+    }
+
+    public String getLocalTopic() {
+        return DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterId,
+            m_serviceGroup);
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   Fri Dec 31 17:13:10 2010
@@ -16,19 +16,21 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
-import java.io.Serializable;
 import java.util.Map;
 
-import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointInvokeMessage extends RoutedMessage implements 
Serializable {
+public class EndpointInvokeMessage extends RoutableMessage implements 
LocalTopicMessage {
 
-    private ServiceEndPoint m_serviceEndPoint;
-    private Map<String, Object> m_payload;
+    private static final long serialVersionUID = 1L;
+
+    private final ServiceEndPoint m_serviceEndPoint;
+    private final Map<String, Object> m_payload;
 
     public EndpointInvokeMessage(ServiceEndPoint serviceEndPoint, Map<String, 
Object> payload) {
-        super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId());
+        super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId(), 
serviceEndPoint.getServiceGroup());
         m_serviceEndPoint = serviceEndPoint;
         m_payload = payload;
     }
@@ -40,4 +42,9 @@
     public Map<String, Object> getPayload() {
         return m_payload;
     }
+
+    public String getLocalTopic() {
+        return DistributionUtilities.getLocalInvokeTopic(getTargetClusterId(),
+            getTargetServiceGroup());
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
  Fri Dec 31 17:13:10 2010
@@ -16,19 +16,29 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
-import java.io.Serializable;
-
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointPublishMessage implements Serializable {
+public class EndpointPublishMessage extends RoutableMessage implements 
LocalTopicMessage {
+
+    private static final long serialVersionUID = 1L;
 
     private ServiceEndPoint m_serviceEndPoint;
 
     public EndpointPublishMessage(ServiceEndPoint serviceEndPoint) {
+        super(serviceEndPoint.getServiceGroup());
         m_serviceEndPoint = serviceEndPoint;
     }
 
     public ServiceEndPoint getServiceEndPoint() {
+        m_serviceEndPoint.setClusterId(getOriginClusterId());
+        m_serviceEndPoint.setMemberId(getOriginMemberId());
         return m_serviceEndPoint;
     }
+
+    public String getLocalTopic() {
+        return 
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
+            m_serviceEndPoint.getServiceGroup());
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 Fri Dec 31 17:13:10 2010
@@ -16,23 +16,30 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
-import java.io.Serializable;
 import java.util.Map;
 
-import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
-import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
 
-public class EndpointResponseMessage extends RoutedMessage implements 
Serializable {
+public class EndpointResponseMessage extends RoutableMessage implements 
LocalTopicMessage {
+
+    private static final long serialVersionUID = 1L;
 
     private Map<String, Object> m_payload;
 
-    public EndpointResponseMessage(final String originClusterId, final String 
originMemberId,
+    public EndpointResponseMessage(final String targetClusterId, final String 
targetMemberId,
+        final String targetServiceGroup,
         final Map<String, Object> payload) {
-        super(originClusterId, originMemberId);
+        super(targetClusterId, targetMemberId, targetServiceGroup);
         m_payload = payload;
     }
 
     public Map<String, Object> getPayload() {
         return m_payload;
     }
+
+    public String getLocalTopic() {
+        return 
DistributionUtilities.getLocalResponseTopic(getTargetClusterId(),
+            getTargetServiceGroup());
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   Fri Dec 31 17:13:10 2010
@@ -19,27 +19,32 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Hashtable;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
-import org.amdatu.core.fabric.remote.DistributionService;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.osgi.service.log.LogService;
 
 /**
  * I am a delegate to the DistributionService
  * I proxy local service invocations and put them over the cluster
  */
-public final class LocalServiceInvocationHandler implements InvocationHandler, 
ClusterTopicListener {
+public final class LocalServiceInvocationHandler implements InvocationHandler {
 
     private final static long INVOCATION_TIMEOUT = 100;
 
@@ -49,27 +54,32 @@
     private final ReentrantReadWriteLock m_invocationIdentifiersLock = new 
ReentrantReadWriteLock();
     private final ReentrantReadWriteLock m_invocationResponsesLock = new 
ReentrantReadWriteLock();
 
-    private volatile ClusterMemberService m_clusterMemberService;
-    private volatile ClusterMessageService m_clusterMessageService;
+    private volatile DependencyManager m_dependencyManager;
+    private volatile Component m_component;
+    private volatile EventAdmin m_eventAdmin;
+    private volatile LogService m_logService;
+
+    private volatile Component m_serviceInvocationEventHandlerComponent;
 
     private final ServiceEndPoint m_serviceEndpoint;
-    private final Class<?>[] m_interfaceClasses;
-    private final Set<Method> m_interfaceMethods = new HashSet<Method>();
+    private final Set<Method> m_serviceInterfaceMethods = new 
HashSet<Method>();
 
-    private String m_serviceEndpointTopic;
+    private final String m_clusterGroupId;
+    private final String m_serviceGroupId;
 
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public LocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint, 
Class<?>[] interfaceClasses) {
+  public LocalServiceInvocationHandler(String clusterGroupId, String 
serviceGroupId,
+  ServiceEndPoint serviceEndpoint,
+  Class<?>[] interfaceClasses) {
+        m_clusterGroupId = clusterGroupId;
+        m_serviceGroupId = serviceGroupId;
         m_serviceEndpoint = serviceEndpoint;
-        m_interfaceClasses = interfaceClasses;
-        m_serviceEndpointTopic = DistributionService.REMOTE_TOPIC;
-
         for (Class<?> interfaceClass : interfaceClasses) {
             for (Method serviceMethod : interfaceClass.getMethods()) {
-                m_interfaceMethods.add(serviceMethod);
+                m_serviceInterfaceMethods.add(serviceMethod);
             }
         }
     }
@@ -86,25 +96,58 @@
      * Service lifecycle
      ********************************************************/
 
+    public synchronized void init() {
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        m_component.add(logServiceDependency);
+
+        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
+        eventAdminServiceDependency.setService(EventAdmin.class);
+        eventAdminServiceDependency.setRequired(true);
+        m_component.add(eventAdminServiceDependency);
+
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventConstants.EVENT_TOPIC,
+            new String[] { 
DistributionUtilities.getLocalResponseTopic(m_clusterGroupId, m_serviceGroupId) 
});
+        m_serviceInvocationEventHandlerComponent = 
m_dependencyManager.createComponent();
+        
m_serviceInvocationEventHandlerComponent.setInterface(EventHandler.class.getName(),
 props);
+        m_serviceInvocationEventHandlerComponent.setImplementation(new 
ServiceInvocationEventHandler());
+    }
+
+    public synchronized void destroy() {
+    }
+
     public synchronized void start() {
-        m_clusterMessageService.subscribe(this);
+        m_logService.log(LogService.LOG_WARNING, "Starting 
LocalServiceInvocationHandler");
+        m_dependencyManager.add(m_serviceInvocationEventHandlerComponent);
     }
 
     public synchronized void stop() {
-        m_clusterMessageService.unsubscribe(this);
+        m_logService.log(LogService.LOG_WARNING, "Starting 
LocalServiceInvocationHandler");
+        m_dependencyManager.remove(m_serviceInvocationEventHandlerComponent);
     }
 
-
     /********************************************************
      * InvocationHandler
      ********************************************************/
 
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
         if (isServiceInterfaceInvocation(method)) {
+
             String invocationIdentifier = createNewInvocationIdentifier();
-            Map<String, Object> payload = 
getInvocationPayload(invocationIdentifier, method, args);
-            m_clusterMessageService.publish(m_serviceEndpointTopic, new 
EndpointInvokeMessage(m_serviceEndpoint,
-                payload));
+            Map<String, Object> messagePayload = 
getInvocationPayload(invocationIdentifier, method, args);
+            EndpointInvokeMessage message = new 
EndpointInvokeMessage(m_serviceEndpoint, messagePayload);
+            // FIXME this is awkward
+            message.setOriginServcieGroup(m_serviceGroupId);
+
+            Dictionary<String, Object> eventPayload = new Hashtable<String, 
Object>();
+            eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, 
message);
+            Event event =
+                new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                    eventPayload);
+            m_eventAdmin.postEvent(event);
+
             Object response = retrieveInvocationResponse(invocationIdentifier);
             return response;
         }
@@ -112,27 +155,6 @@
     }
 
     /********************************************************
-     * ClusterTopicListner
-     ********************************************************/
-
-    public String getTopic() {
-        return m_serviceEndpointTopic;
-    }
-
-    public void recieveMessage(Object message) {
-        if (message instanceof EndpointResponseMessage) {
-            EndpointResponseMessage endpointResponseMessage = 
(EndpointResponseMessage) message;
-            Map<String, Object> payload = endpointResponseMessage.getPayload();
-            String invocationId = (String) 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
-            if (ownsInvocationIndentifier(invocationId)) {
-                Object response = 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
-                storeResponseObject(invocationId, response);
-                removeInvocationIdentifier(invocationId);
-            }
-        }
-    }
-
-    /********************************************************
      * Object
      ********************************************************/
 
@@ -164,9 +186,13 @@
 
     private String createNewInvocationIdentifier() {
         String invocationId = UUID.randomUUID().toString();
-        synchronized (m_invocationIdentifiers) {
+        m_invocationIdentifiersLock.writeLock().lock();
+        try {
             m_invocationIdentifiers.add(invocationId);
         }
+        finally {
+            m_invocationIdentifiersLock.writeLock().unlock();
+        }
         return invocationId;
     }
 
@@ -175,10 +201,6 @@
         payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY, 
invocationId);
         payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_METHODNAME_KEY, 
method.getName());
         payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ARGUMENTS_KEY, 
args);
-        
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY,
-            m_clusterMemberService.getClusterId());
-        
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY,
-            m_clusterMemberService.getMemberId());
         return payload;
     }
 
@@ -197,7 +219,8 @@
             }
         }
         if (isResponseTimedOut) {
-            // FIXME now what?
+            m_logService.log(LogService.LOG_WARNING, "Waiting for invocation 
response " + invocationId
+                + " timed out after " + INVOCATION_TIMEOUT + "ms");
         }
         return response;
     }
@@ -258,6 +281,27 @@
 
     private boolean isServiceInterfaceInvocation(final Method method) {
         // no lock needed
-        return m_interfaceMethods.contains(method);
+        return m_serviceInterfaceMethods.contains(method);
+    }
+
+    /********************************************************
+     * Helper classes
+     ********************************************************/
+
+    class ServiceInvocationEventHandler implements EventHandler {
+
+        public void handleEvent(Event event) {
+            Object message = 
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+            if (message instanceof EndpointResponseMessage) {
+                EndpointResponseMessage endpointResponseMessage = 
(EndpointResponseMessage) message;
+                Map<String, Object> payload = 
endpointResponseMessage.getPayload();
+                String invocationId = (String) 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
+                if (ownsInvocationIndentifier(invocationId)) {
+                    Object response = 
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
+                    storeResponseObject(invocationId, response);
+                    removeInvocationIdentifier(invocationId);
+                }
+            }
+        }
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     Fri Dec 31 17:13:10 2010
@@ -25,18 +25,22 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
 import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
 import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.remote.internal.DiscoveryUtilities;
 import org.amdatu.core.fabric.remote.internal.EndpointDepublishMessage;
 import org.amdatu.core.fabric.remote.internal.EndpointDiscoveryMessage;
 import org.amdatu.core.fabric.remote.internal.EndpointPublishMessage;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 import org.osgi.service.log.LogService;
 
 /**
@@ -44,7 +48,7 @@
  * I listen to the cluster and publish local RemoteServiceEndpoint services 
for them
  * TODO support LOOKUP requests for more fine grained discovery (use 
ListenerHook)
  */
-public class DiscoveryServiceImpl implements DiscoveryService, 
ClusterTopicListener {
+public final class DiscoveryServiceImpl implements DiscoveryService {
 
     private final Set<ServiceEndPoint> m_remotableEndPoints = new 
HashSet<ServiceEndPoint>();
     private final ReentrantReadWriteLock m_remotableEndPointsLock = new 
ReentrantReadWriteLock();
@@ -53,35 +57,92 @@
         new HashMap<ServiceEndPoint, Component>();
     private final ReentrantReadWriteLock m_remoteEndPointComponentsLock = new 
ReentrantReadWriteLock();
 
-    private volatile ClusterMemberService m_clusterMemberService;
-    private volatile ClusterMessageService m_clusterMessageService;
     private volatile DependencyManager m_dependencyManager;
     private volatile Component m_component;
+    private volatile EventAdmin m_evenAdmin;
     private volatile LogService m_logService;
 
+    private volatile Component m_discoveryEventHandlerComponent;
+
+    private final String m_clusterGroupId;
+    private final String m_serviceGroupId;
+
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public DiscoveryServiceImpl() {
-
+    public DiscoveryServiceImpl(String clusterGroupId, String serviceGroupId) {
+        m_clusterGroupId = clusterGroupId;
+        m_serviceGroupId = serviceGroupId;
     }
 
     /********************************************************
      * Life cycle
      ********************************************************/
 
+    public synchronized void init() {
+        @SuppressWarnings("unchecked")
+        Dictionary<String, Object> discoveryProps = 
m_component.getServiceProperties();
+        if (discoveryProps == null) {
+            discoveryProps = new Hashtable<String, Object>();
+        }
+        discoveryProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        discoveryProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
+        m_component.setServiceProperties(discoveryProps);
+
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        m_component.add(logServiceDependency);
+
+        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
+        eventAdminServiceDependency.setService(EventAdmin.class);
+        eventAdminServiceDependency.setRequired(true);
+        m_component.add(eventAdminServiceDependency);
+
+        ServiceDependency clusterMemberDependency = 
m_dependencyManager.createServiceDependency();
+        clusterMemberDependency
+            .setService(
+                ClusterMemberService.class,
+                "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" + 
m_clusterGroupId + ")")
+            .setRequired(true);
+        m_component.add(clusterMemberDependency);
+
+        ServiceDependency remotableServiceEndpointsDependecy = 
m_dependencyManager.createServiceDependency();
+        remotableServiceEndpointsDependecy
+               .setService(RemotableServiceEndpoint.class,
+                   "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                       + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+                       + m_serviceGroupId + "))")
+               .setCallbacks("remotableServiceEndPointAdded", 
"remotableServiceEndPointRemoved")
+               .setRequired(false);
+        m_component.add(remotableServiceEndpointsDependecy);
+
+        Dictionary<String, Object> eventHandlerProps = new Hashtable<String, 
Object>();
+        eventHandlerProps.put(EventConstants.EVENT_TOPIC,
+            new String[] { 
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId) 
});
+        m_discoveryEventHandlerComponent = 
m_dependencyManager.createComponent();
+        
m_discoveryEventHandlerComponent.setInterface(EventHandler.class.getName(), 
eventHandlerProps);
+        m_discoveryEventHandlerComponent.setImplementation(new 
DiscoveryEventHandler());
+    }
+
+    public synchronized void destroy() {
+        removeRemoteEndpointComponents();
+    }
+
     public synchronized void start() {
-        m_clusterMessageService.subscribe(this);
-        m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointDiscoveryMessage());
+        m_logService.log(LogService.LOG_INFO, "Starting " + toString());
+        m_dependencyManager.add(m_discoveryEventHandlerComponent);
+        m_evenAdmin.postEvent(createdDiscoveryEvent());
     }
 
     public synchronized void stop() {
-        m_clusterMessageService.unsubscribe(this);
+        m_logService.log(LogService.LOG_INFO, "Stopping " + toString());
+        m_dependencyManager.remove(m_discoveryEventHandlerComponent);
     }
 
     /********************************************************
-     * Callbacks
+     * Dependency callbacks
      ********************************************************/
 
     public void remotableServiceEndPointAdded(
@@ -92,7 +153,7 @@
         try {
             if (!m_remotableEndPoints.contains(serviceEndPoint)) {
                 m_remotableEndPoints.add(serviceEndPoint);
-                m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointPublishMessage(serviceEndPoint));
+                
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint));
             }
             else {
                 throw new IllegalStateException("Unexpected state... needs 
analysis");
@@ -111,8 +172,8 @@
         m_remotableEndPointsLock.writeLock().lock();
         try {
             if (m_remotableEndPoints.contains(serviceEndPoint)) {
-                m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointDepublishMessage(serviceEndPoint));
                 m_remotableEndPoints.remove(serviceEndPoint);
+                
m_evenAdmin.postEvent(createEndpointDepublishEvent(serviceEndPoint));
             }
             else {
                 throw new IllegalStateException("Unexpected state... needs 
analysis");
@@ -124,97 +185,137 @@
     }
 
     /********************************************************
-     * DiscoveryService
+     * Private methods
      ********************************************************/
 
-    public ServiceEndPoint[] getLocalServiceEndPoints() {
-        m_remotableEndPointsLock.readLock().lock();
+    private Event createdDiscoveryEvent() {
+        Dictionary<String, Object> eventProps = new Hashtable<String, 
Object>();
+        eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
+            new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
+        Event discoveryEvent =
+            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                eventProps);
+        return discoveryEvent;
+    }
+
+    private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint) {
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointPublishMessage(serviceEndPoint));
+        Event event =
+            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                props);
+        return event;
+    }
+
+    private Event createEndpointDepublishEvent(ServiceEndPoint 
serviceEndPoint) {
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointDepublishMessage(serviceEndPoint));
+        Event event =
+            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                props);
+        return event;
+    }
+
+    private void removeRemoteEndpointComponents() {
+        m_remoteEndPointComponentsLock.writeLock().lock();
         try {
-            return m_remotableEndPoints.toArray(new 
ServiceEndPoint[m_remotableEndPoints.size()]);
+            for (Component remoteEndPointComponent : 
m_remoteEndPointComponents.values()) {
+                m_dependencyManager.remove(remoteEndPointComponent);
+            }
+            m_remoteEndPointComponents.clear();
         }
         finally {
-            m_remotableEndPointsLock.readLock().unlock();
+            m_remoteEndPointComponentsLock.writeLock().unlock();
         }
     }
 
-    public ServiceEndPoint[] getRemoteServiceEndPoints() {
-        m_remoteEndPointComponentsLock.readLock().lock();
+    private void recieveEndpointDiscoveryMessage(final 
EndpointDiscoveryMessage endpointDiscoveryMessage) {
+        m_remotableEndPointsLock.writeLock().lock();
         try {
-            return m_remoteEndPointComponents.keySet().toArray(
-                new 
ServiceEndPoint[m_remoteEndPointComponents.keySet().size()]);
+            for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
+                Event event = createEndpointPublishEvent(serviceEndPoint);
+                m_evenAdmin.postEvent(event);
+            }
         }
         finally {
-            m_remoteEndPointComponentsLock.readLock().unlock();
+            m_remotableEndPointsLock.writeLock().unlock();
         }
+        return;
     }
 
-    /********************************************************
-     * ClusterTopicListner
-     ********************************************************/
+    private void recieveEndpointDeplublishMessage(EndpointDepublishMessage 
endpointDepublishMessage) {
+        ServiceEndPoint serviceEndPoint = 
endpointDepublishMessage.getServiceEndPoint();
+        Component serviceComponent;
+        m_remoteEndPointComponentsLock.writeLock().lock();
+        try {
+            if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
+                return;
+            serviceComponent = 
m_remoteEndPointComponents.remove(serviceEndPoint);
+        }
+        finally {
+            m_remoteEndPointComponentsLock.writeLock().unlock();
+        }
+        if (serviceComponent != null)
+            m_dependencyManager.remove(serviceComponent);
+        return;
+    }
 
-    public void recieveMessage(Object message) {
-        if (message instanceof EndpointPublishMessage) {
-            EndpointPublishMessage endpointPublishMessage = 
(EndpointPublishMessage) message;
-            ServiceEndPoint serviceEndPoint = 
endpointPublishMessage.getServiceEndPoint();
-            m_remoteEndPointComponentsLock.writeLock().lock();
-            try {
-                if (m_remoteEndPointComponents.containsKey(serviceEndPoint))
-                    return;
-                Dictionary<String, Object> distributionProps = new 
Hashtable<String, Object>();
-                
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP,
-                    m_clusterMemberService.getClusterId());
-                
distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, 
m_clusterMemberService.getMemberId());
-                Component serviceComponent =
-                    m_dependencyManager.createComponent()
-                        .setInterface(RemoteServiceEndPoint.class.getName(), 
distributionProps)
-                        .setImplementation(new 
RemoteServiceEndPointImpl(serviceEndPoint));
-                
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
-                    DiscoveryService.class,
-                    "(&(" + ClusterMemberService.CLUSTER_CLUSTERID_PROP + "="
-                        + 
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_CLUSTERID_PROP)
 + ")("
-                        + ClusterMemberService.CLUSTER_MEMBERID_PROP + "="
-                        + 
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_MEMBERID_PROP)
 + "))")
-                    .setRequired(true));
-                m_dependencyManager.add(serviceComponent);
-                m_remoteEndPointComponents.put(serviceEndPoint, 
serviceComponent);
-            }
-            finally {
-                m_remoteEndPointComponentsLock.writeLock().unlock();
-            }
-            return;
-        }
-        if (message instanceof EndpointDepublishMessage) {
-            EndpointDepublishMessage endpointDepublishMessage = 
(EndpointDepublishMessage) message;
-            ServiceEndPoint serviceEndPoint = 
endpointDepublishMessage.getServiceEndPoint();
-            m_remoteEndPointComponentsLock.writeLock().lock();
-            try {
-                if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
-                    return;
-                Component serviceComponent = 
m_remoteEndPointComponents.remove(serviceEndPoint);
-                m_dependencyManager.remove(serviceComponent);
-            }
-            finally {
-                m_remoteEndPointComponentsLock.writeLock().unlock();
-            }
-            return;
-        }
-        if (message instanceof EndpointDiscoveryMessage) {
-            m_remotableEndPointsLock.writeLock().lock();
-            try {
-                for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
-                    m_clusterMessageService.publish(DISCOVERY_TOPIC, new 
EndpointPublishMessage(serviceEndPoint));
-                }
-            }
-            finally {
-                m_remotableEndPointsLock.writeLock().unlock();
+    private void recieveEndpointPublishMessage(EndpointPublishMessage 
endpointPublishMessage) {
+        ServiceEndPoint serviceEndPoint = 
endpointPublishMessage.getServiceEndPoint();
+        Component serviceComponent = null;
+        m_remoteEndPointComponentsLock.writeLock().lock();
+        try {
+            if (m_remoteEndPointComponents.containsKey(serviceEndPoint)) {
+                return;
             }
-            return;
+            Dictionary<String, Object> distributionProps = new 
Hashtable<String, Object>();
+            
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
+                m_clusterGroupId);
+            distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
+                m_serviceGroupId);
+            serviceComponent =
+                m_dependencyManager.createComponent()
+                    .setInterface(RemoteServiceEndPoint.class.getName(), 
distributionProps)
+                    .setImplementation(new 
RemoteServiceEndPointImpl(serviceEndPoint));
+            
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
+                DiscoveryService.class,
+                "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                    + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+                    + m_serviceGroupId + "))")
+                .setRequired(true));
+            m_remoteEndPointComponents.put(serviceEndPoint, serviceComponent);
+        }
+        finally {
+            m_remoteEndPointComponentsLock.writeLock().unlock();
         }
-        throw new IllegalStateException("Unknown message type " + 
message.getClass().getName() + "on channel "
-            + DISCOVERY_TOPIC);
+        if (serviceComponent != null) {
+            m_dependencyManager.add(serviceComponent);
+        }
+        return;
     }
 
-    public String getTopic() {
-        return DISCOVERY_TOPIC;
+    /********************************************************
+     * Helper classes
+     ********************************************************/
+
+    class DiscoveryEventHandler implements EventHandler {
+
+        public void handleEvent(Event event) {
+            Object message = 
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+            if (message instanceof EndpointPublishMessage) {
+                recieveEndpointPublishMessage((EndpointPublishMessage) 
message);
+                return;
+            }
+            if (message instanceof EndpointDepublishMessage) {
+                recieveEndpointDeplublishMessage((EndpointDepublishMessage) 
message);
+                return;
+            }
+            if (message instanceof EndpointDiscoveryMessage) {
+                recieveEndpointDiscoveryMessage((EndpointDiscoveryMessage) 
message);
+                return;
+            }
+            throw new IllegalStateException("Unknown message type " + 
message.getClass().getName() + "on channel "
+                + DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, 
m_serviceGroupId));
+        }
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  Fri Dec 31 17:13:10 2010
@@ -26,8 +26,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
+import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.DistributionService;
 import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
 import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
@@ -39,11 +38,17 @@
 import org.amdatu.core.fabric.remote.internal.LocalServiceInvocationHandler;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 import org.osgi.service.log.LogService;
 
-public class DistributionServiceImpl implements DistributionService, 
ClusterTopicListener {
+public class DistributionServiceImpl implements DistributionService {
 
     public final static String MESSAGE_INVOCATION_ID_KEY = "A";
     public final static String MESSAGE_INVOCATION_METHODNAME_KEY = "B";
@@ -52,80 +57,131 @@
     public final static String MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY = "E";
     public final static String MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY = "F";
 
-    private final Map<ServiceEndPoint, ServiceReferenceComponentTuple> 
m_serviceEndPointServiceReferenceComponents =
+    private final Map<ServiceEndPoint, ServiceReferenceComponentTuple> 
m_localServiceEndPointServiceReferenceComponents =
         new HashMap<ServiceEndPoint, ServiceReferenceComponentTuple>();
-    private final ReentrantReadWriteLock 
m_serviceEndPointServiceReferenceComponentsLock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock 
m_localServiceEndPointServiceReferenceComponentsLock =
+        new ReentrantReadWriteLock();
 
-    private final Map<ServiceEndPoint, Component> m_serviceEndPointComponents =
+    private final Map<ServiceEndPoint, Component> 
m_remoteServiceEndPointComponents =
         new HashMap<ServiceEndPoint, Component>();
-    private final ReentrantReadWriteLock m_serviceEndPointComponentsLock = new 
ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock m_remoteServiceEndPointComponentsLock 
= new ReentrantReadWriteLock();
 
-    private volatile ClusterMemberService m_clusterMemberService;
-    private volatile ClusterMessageService m_clusterMessageService;
     private volatile DependencyManager m_dependencyManager;
     private volatile BundleContext m_bundleContext;
     private volatile Component m_component;
+    private volatile EventAdmin m_eventAdmin;
     private volatile LogService m_logService;
 
+    private volatile Component m_distributionEventHandlerComponent;
+
+    private final String m_clusterGroupId;
+    private final String m_serviceGroupId;
+
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public DistributionServiceImpl() {
+        public DistributionServiceImpl(String clusterGroupId, String 
serviceGroupId) {
+        m_clusterGroupId = clusterGroupId;
+        m_serviceGroupId = serviceGroupId;
     }
 
     /********************************************************
-     * Service life cycle methods
+     * Service lifecycle
      ********************************************************/
 
-    public synchronized void start() {
-        m_logService.log(LogService.LOG_WARNING, "Starting 
DistributionService");
+    public synchronized void init() {
+        @SuppressWarnings("unchecked")
         Dictionary<String, Object> distributionProps = 
m_component.getServiceProperties();
-        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, 
m_clusterMemberService.getClusterId());
-        distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, 
m_clusterMemberService.getMemberId());
+        if (distributionProps == null) {
+            distributionProps = new Hashtable<String, Object>();
+        }
+        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
         
distributionProps.put(DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED_PROP,
             DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED);
         
distributionProps.put(DistributionService.DISTRIBUTION_INTENTS_SUPPORTED_PROP,
             DistributionService.DISTRIBUTION_INTENTS_SUPPORTED);
         m_component.setServiceProperties(distributionProps);
-        m_clusterMessageService.subscribe(this);
+
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        m_component.add(logServiceDependency);
+
+        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
+        eventAdminServiceDependency.setService(EventAdmin.class);
+        eventAdminServiceDependency.setRequired(true);
+        m_component.add(eventAdminServiceDependency);
+
+        ServiceDependency clusterMemberDependency = 
m_dependencyManager.createServiceDependency();
+        clusterMemberDependency
+            .setService(
+                ClusterMemberService.class,
+                "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" + 
m_clusterGroupId + ")")
+                .setRequired(true);
+        m_component.add(clusterMemberDependency);
+
+        ServiceDependency remotableServicesDependecy = 
m_dependencyManager.createServiceDependency();
+        remotableServicesDependecy
+               .setService(
+                   "("
+                       + DistributionService.SERVICE_EXPORTED_CONFIGS_PROP + 
"="
+                       + DistributionService.SERVICE_CONFIGURATION_TYPE + ")")
+               .setCallbacks("localRemotableServiceAdded", 
"localRemotableServiceRemoved")
+               .setRequired(false);
+        m_component.add(remotableServicesDependecy);
+
+        ServiceDependency remoteServiceEndpointsDependecy = 
m_dependencyManager.createServiceDependency();
+        remoteServiceEndpointsDependecy
+               .setService(RemoteServiceEndPoint.class,
+                   "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                       + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+                       + m_serviceGroupId + "))")
+               .setCallbacks("remoteServiceEndPointAdded", 
"remoteServiceEndPointRemoved")
+               .setRequired(false);
+        m_component.add(remoteServiceEndpointsDependecy);
+
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventConstants.EVENT_TOPIC,
+            new String[] { 
DistributionUtilities.getLocalInvokeTopic(m_clusterGroupId, m_serviceGroupId) 
});
+        m_distributionEventHandlerComponent = 
m_dependencyManager.createComponent();
+        
m_distributionEventHandlerComponent.setInterface(EventHandler.class.getName(), 
props);
+        m_distributionEventHandlerComponent.setImplementation(new 
DistributionEventHandler());
+    }
+
+    public synchronized void destroy() {
+        removeLocalServiceEndPointComponents();
+        removeRemoteServiceEndPointComponents();
+    }
+
+    public synchronized void start() {
+        m_logService.log(LogService.LOG_INFO, "Starting " + toString());
+        m_dependencyManager.add(m_distributionEventHandlerComponent);
+
     }
 
     public synchronized void stop() {
-        m_logService.log(LogService.LOG_WARNING, "Stopping 
DistributionService");
-        m_clusterMessageService.unsubscribe(this);
+        m_logService.log(LogService.LOG_INFO, "Stopping " + toString());
+        m_dependencyManager.remove(m_distributionEventHandlerComponent);
     }
 
     /********************************************************
      * Dependency Callback methods
      ********************************************************/
 
-    public void localRemotableServiceAdded(ServiceReference serviceReference 
/* , Object Service */) {
+    public void localRemotableServiceAdded(final ServiceReference 
serviceReference /* , Object Service */) {
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
         if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
             m_logService
                 .log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint 
configuration " + serviceEndPoint.toString());
             return;
         }
-        Dictionary<String, Object> distributionProps = new Hashtable<String, 
Object>();
-        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, 
m_clusterMemberService.getClusterId());
-        distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, 
m_clusterMemberService.getMemberId());
-        Component serviceComponent =
-                m_dependencyManager.createComponent()
-                    .setInterface(RemotableServiceEndpoint.class.getName(), 
distributionProps)
-                    .setImplementation(new 
RemotableServiceEndPointImpl(serviceEndPoint));
-        
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
-            DistributionService.class,
-            "(&(" + ClusterMemberService.CLUSTER_CLUSTERID_PROP + "="
-                + 
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_CLUSTERID_PROP)
 + ")("
-                + ClusterMemberService.CLUSTER_MEMBERID_PROP + "="
-                + 
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_MEMBERID_PROP)
 + "))")
-            .setRequired(true));
-
-        m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+        Component serviceComponent = 
createRemotableEndPointComponent(serviceEndPoint);
+        
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
         try {
-            if 
(!m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
-                
m_serviceEndPointServiceReferenceComponents.put(serviceEndPoint,
+            if 
(!m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint))
 {
+                
m_localServiceEndPointServiceReferenceComponents.put(serviceEndPoint,
                         new ServiceReferenceComponentTuple(
                             serviceReference, serviceComponent));
                 m_dependencyManager.add(serviceComponent);
@@ -133,17 +189,17 @@
             }
         }
         finally {
-            
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+            
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
         }
     }
 
     public void localRemotableServiceRemoved(ServiceReference serviceReference 
/* , Object Service */) {
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
-        m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+        
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
         try {
-            if 
(m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
+            if 
(m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) 
{
                 ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-                    
m_serviceEndPointServiceReferenceComponents.remove(serviceEndPoint);
+                    
m_localServiceEndPointServiceReferenceComponents.remove(serviceEndPoint);
                 
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
                 m_logService
                     .log(LogService.LOG_WARNING, "Removed local 
ServiceEndPoint: " + serviceEndPoint.toString());
@@ -155,7 +211,7 @@
             }
         }
         finally {
-            
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+            
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
         }
     }
 
@@ -165,10 +221,10 @@
         Object localServiceProxy = 
createLocalServiceInvocationHandler(serviceEndPoint);
         if (localServiceProxy != null) {
             Component localServiceComponent = 
createLocalServiceComponent(serviceEndPoint, localServiceProxy);
-            m_serviceEndPointComponentsLock.writeLock().lock();
+            m_remoteServiceEndPointComponentsLock.writeLock().lock();
             try {
-                if (!m_serviceEndPointComponents.containsKey(serviceEndPoint)) 
{
-                    m_serviceEndPointComponents.put(serviceEndPoint, 
localServiceComponent);
+                if 
(!m_remoteServiceEndPointComponents.containsKey(serviceEndPoint)) {
+                    m_remoteServiceEndPointComponents.put(serviceEndPoint, 
localServiceComponent);
                     m_dependencyManager.add(localServiceComponent);
                     m_logService.log(LogService.LOG_WARNING,
                         "Added remote ServiceEndPoint: " + 
serviceEndPoint.toString());
@@ -180,7 +236,7 @@
                 }
             }
             finally {
-                m_serviceEndPointComponentsLock.writeLock().unlock();
+                m_remoteServiceEndPointComponentsLock.writeLock().unlock();
             }
         }
     }
@@ -188,10 +244,10 @@
     public void remoteServiceEndPointRemoved(/* ServiceReference 
serviceReference, */Object remoteServiceEndPointObject) {
         RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint) 
remoteServiceEndPointObject;
         ServiceEndPoint serviceEndPoint = 
remoteServiceEndPoint.getServiceEndPoint();
-        m_serviceEndPointComponentsLock.writeLock().lock();
+        m_remoteServiceEndPointComponentsLock.writeLock().lock();
         try {
-            if (m_serviceEndPointComponents.containsKey(serviceEndPoint)) {
-                Component localServiceComponent = 
m_serviceEndPointComponents.get(serviceEndPoint);
+            if 
(m_remoteServiceEndPointComponents.containsKey(serviceEndPoint)) {
+                Component localServiceComponent = 
m_remoteServiceEndPointComponents.remove(serviceEndPoint);
                 m_dependencyManager.remove(localServiceComponent);
                 m_logService.log(LogService.LOG_WARNING,
                     "Removed remote ServiceEndPoint: " + 
serviceEndPoint.toString());
@@ -203,112 +259,63 @@
             }
         }
         finally {
-            m_serviceEndPointComponentsLock.writeLock().unlock();
+            m_remoteServiceEndPointComponentsLock.writeLock().unlock();
         }
     }
 
     /********************************************************
-     * ClusterTopicListener
+     * Private methods
      ********************************************************/
 
-    public String getTopic() {
-        return DistributionService.REMOTE_TOPIC;
+    private void removeLocalServiceEndPointComponents() {
+        
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
+        try {
+            for (ServiceReferenceComponentTuple serviceReferenceComponentTuple 
: m_localServiceEndPointServiceReferenceComponents
+                .values()) {
+                
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
+            }
+            m_localServiceEndPointServiceReferenceComponents.clear();
+        }
+        finally {
+            
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+        }
     }
 
-    public void recieveMessage(Object message) {
-        if (message instanceof EndpointInvokeMessage) {
-            EndpointInvokeMessage endpointInvokeMessage = 
(EndpointInvokeMessage) message;
-            ServiceEndPoint serviceEndPoint = 
endpointInvokeMessage.getServiceEndPoint();
-            Map<String, Object> payload = endpointInvokeMessage.getPayload();
-            String invocationId = (String) 
payload.get(MESSAGE_INVOCATION_ID_KEY);
-            String originClusterId = (String) 
payload.get(MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY);
-            String originMemberId = (String) 
payload.get(MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY);
-            String methodName = (String) 
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
-            Object[] args = (Object[]) 
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
-            Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
-
-            Object serviceResponse = null;
-            m_serviceEndPointServiceReferenceComponentsLock.readLock().lock();
-            try {
-                ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-                    
m_serviceEndPointServiceReferenceComponents.get(serviceEndPoint);
-                if (serviceReferenceComponentTuple == null) {
-                    if 
(serviceEndPoint.getMemberId().equals(m_clusterMemberService.getMemberId())) {
-                        // TODO local service gone.. what to do? Send back 
error to prevent client from waiting...
-                        m_logService
-                            .log(
-                                LogService.LOG_WARNING,
-                                "Dropping EndpointInvokeMessage for unknown 
ServiceEndPoint: "
-                                    + serviceEndPoint.toString());
-                    }
-                    else {
-                        m_logService.log(LogService.LOG_WARNING,
-                            "Dropping EndpointInvokeMessage for other 
clustermember: " + serviceEndPoint.toString());
-                    }
-                    return;
-                }
-                ServiceReference serviceReference = 
serviceReferenceComponentTuple.getServiceReference();
-                try {
-                    Object serviceObject = 
m_bundleContext.getService(serviceReference);
-                    if (serviceObject == null) {
-                        // TODO local service gone.. what to do? Send back 
error to prevent client from waiting...
-                        m_logService
-                            .log(
-                                LogService.LOG_WARNING,
-                                "Dropping EndpointInvokeMessage for 
unavailable service: "
-                                    + serviceEndPoint.toString());
-                        return;
-                    }
-                    Method serviceMethod = 
serviceObject.getClass().getMethod(methodName, types);
-                    serviceResponse = serviceMethod.invoke(serviceObject, 
args);
-
-                    m_bundleContext.ungetService(serviceReference);
-                }
-                catch (SecurityException e) {
-                    // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
-                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
-                    e.printStackTrace();
-                }
-                catch (NoSuchMethodException e) {
-                    // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
-                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
-                    e.printStackTrace();
-                }
-                catch (IllegalArgumentException e) {
-                    // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
-                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
-                    e.printStackTrace();
-                }
-                catch (IllegalAccessException e) {
-                    // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
-                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
-                    e.printStackTrace();
-                }
-                catch (InvocationTargetException e) {
-                    // TODO its fooked.. what to do? Send back error to 
prevent client from waiting...
-                    m_logService.log(LogService.LOG_ERROR, "Exception during 
local service invocation", e);
-                    e.printStackTrace();
-                }
-            }
-            finally {
-                
m_serviceEndPointServiceReferenceComponentsLock.readLock().unlock();
-            }
-            if (serviceResponse != null) {
-                Map<String, Object> responsePayload = new HashMap<String, 
Object>();
-                responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
-                responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY, 
serviceResponse);
-                m_clusterMessageService
-                    .publish(REMOTE_TOPIC,
-                        new EndpointResponseMessage(originClusterId, 
originMemberId, responsePayload));
+    private void removeRemoteServiceEndPointComponents() {
+        m_remoteServiceEndPointComponentsLock.writeLock().lock();
+        try {
+            for (Component serviceEndPointComponent : 
m_remoteServiceEndPointComponents.values()) {
+                m_dependencyManager.remove(serviceEndPointComponent);
             }
+            m_remoteServiceEndPointComponents.clear();
+        }
+        finally {
+            m_remoteServiceEndPointComponentsLock.writeLock().unlock();
         }
     }
 
-    /********************************************************
-     * private
-     ********************************************************/
+    private Component createRemotableEndPointComponent(final ServiceEndPoint 
serviceEndPoint) {
+        Dictionary<String, Object> distributionProps = new Hashtable<String, 
Object>();
+        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
+        Component serviceComponent =
+                m_dependencyManager.createComponent()
+                    .setInterface(RemotableServiceEndpoint.class.getName(), 
distributionProps)
+                    .setImplementation(new 
RemotableServiceEndPointImpl(serviceEndPoint));
+        ServiceDependency serviceDependency =
+            m_dependencyManager
+                .createServiceDependency()
+                .setService(
+                    DistributionService.class,
+                    "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + 
"="
+                        + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+                        + m_serviceGroupId + "))")
+                .setRequired(true);
+        serviceComponent.add(serviceDependency);
+        return serviceComponent;
+    }
 
-    private Object createLocalServiceInvocationHandler(ServiceEndPoint 
serviceEndpoint) {
+    private Object createLocalServiceInvocationHandler(final ServiceEndPoint 
serviceEndpoint) {
         Class<?>[] interfaceClasses = new 
Class<?>[serviceEndpoint.getObjectClass().length];
         for (int i = 0; i < serviceEndpoint.getObjectClass().length; i++) {
             String interfaceName = serviceEndpoint.getObjectClass()[i];
@@ -322,48 +329,53 @@
             }
         }
         LocalServiceInvocationHandler localServiceInvocationHandler =
-            new LocalServiceInvocationHandler(serviceEndpoint, 
interfaceClasses);
+            new LocalServiceInvocationHandler(m_clusterGroupId, 
m_serviceGroupId, serviceEndpoint,
+                interfaceClasses);
         Object serviceObject = 
Proxy.newProxyInstance(interfaceClasses[0].getClassLoader(),
             interfaceClasses, localServiceInvocationHandler);
         return serviceObject;
     }
 
-    private Component createLocalServiceComponent(ServiceEndPoint 
serviceEndPoint, Object serviceObject) {
-        Hashtable<String, Object> registrationProperties = 
serviceEndPoint.getProperties();
-
-        String[] importedIntents = 
DistributionUtilities.mergeExportedIntents(registrationProperties);
-
-        
registrationProperties.remove(DistributionService.SERVICE_INTENTS_PROP);
-        
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP);
-        
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
-        
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
-        
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
-
-        registrationProperties.put(DistributionService.SERVICE_IMPORTED_PROP, 
"true");
-        registrationProperties.put(DistributionService.SERVICE_INTENTS_PROP, 
importedIntents);
-        
registrationProperties.put(DistributionService.SERVICE_IMPORTED_CONFIGS_PROP,
+    private Component createLocalServiceComponent(final ServiceEndPoint 
serviceEndpoint, final Object serviceObject) {
+        Hashtable<String, Object> props = serviceEndpoint.getProperties();
+        String[] importedIntents = 
DistributionUtilities.mergeExportedIntents(props);
+        props.remove(DistributionService.SERVICE_INTENTS_PROP);
+        props.remove(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP);
+        props.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
+        props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
+        props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
+        props.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        props.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
+        props.put(DistributionService.SERVICE_IMPORTED_PROP, "true");
+        props.put(DistributionService.SERVICE_INTENTS_PROP, importedIntents);
+        props.put(DistributionService.SERVICE_IMPORTED_CONFIGS_PROP,
             DistributionService.SERVICE_CONFIGURATION_TYPE);
-
-        Component component = m_dependencyManager.createComponent()
-            .setInterface(serviceEndPoint.getObjectClass(), 
registrationProperties)
+        Component serviceComponent = m_dependencyManager.createComponent()
+            .setInterface(serviceEndpoint.getObjectClass(), props)
             .setImplementation(serviceObject);
-        
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMemberService.class)
-            .setRequired(true));
-        
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMessageService.class)
-            .setRequired(true));
-        return component;
+        ServiceDependency serviceDependency =
+            m_dependencyManager
+                .createServiceDependency()
+                .setService(
+                    DistributionService.class,
+                    "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + 
"="
+                        + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+                        + m_serviceGroupId + "))")
+                .setRequired(true);
+        serviceComponent.add(serviceDependency);
+        return serviceComponent;
     }
 
-    private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint 
serviceEndPoint) {
-        if 
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndPoint.getProperties()))
 {
+    private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint 
serviceEndpoint) {
+        if 
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndpoint.getProperties()))
 {
             System.err.println("No supported configuration type");
             return false;
         }
-        if 
(!DistributionUtilities.isExportedIntentsListSupported(serviceEndPoint.getProperties()))
 {
+        if 
(!DistributionUtilities.isExportedIntentsListSupported(serviceEndpoint.getProperties()))
 {
             System.err.println("Not all intents supported");
             return false;
         }
-        if 
(!DistributionUtilities.isExportedInterfacesSupported(serviceEndPoint.getProperties(),
+        if 
(!DistributionUtilities.isExportedInterfacesSupported(serviceEndpoint.getProperties(),
             new ClassLoaderAdaptor() {
                 public Class<?> loadClass(String className) throws 
ClassNotFoundException {
                     return m_bundleContext.getBundle().loadClass(className);
@@ -375,13 +387,13 @@
         return true;
     }
 
-    private ServiceEndPoint 
serviceEndPointFromServiceReference(ServiceReference serviceReference) {
+    private ServiceEndPoint serviceEndPointFromServiceReference(final 
ServiceReference serviceReference) {
         ServiceEndPoint serviceEndPoint = new ServiceEndPoint();
-        serviceEndPoint.setClusterId(m_clusterMemberService.getClusterId());
-        serviceEndPoint.setMemberId(m_clusterMemberService.getMemberId());
+        serviceEndPoint.setClusterId(m_clusterGroupId);
+        serviceEndPoint.setServiceGroup(m_serviceGroupId);
         serviceEndPoint.setObjectClass((String[]) serviceReference
             
.getProperty(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP));
-        serviceEndPoint.setOriginalServiceId((Long) 
serviceReference.getProperty("service.id"));
+        serviceEndPoint.setOriginalServiceId((Long) 
serviceReference.getProperty(Constants.SERVICE_ID));
         Hashtable<String, Object> properties = new Hashtable<String, Object>();
         for (String key : serviceReference.getPropertyKeys()) {
             properties.put(key, serviceReference.getProperty(key));
@@ -390,6 +402,119 @@
         return serviceEndPoint;
     }
 
+    private void recieveEndpointInvokeMessage(EndpointInvokeMessage 
endpointInvokeMessage) {
+
+        ServiceEndPoint serviceEndPoint = 
endpointInvokeMessage.getServiceEndPoint();
+        //FIXME address this
+        serviceEndPoint.setMemberId(null);
+        Object serviceResponse = null;
+
+        m_localServiceEndPointServiceReferenceComponentsLock.readLock().lock();
+        try {
+            ServiceReferenceComponentTuple serviceReferenceComponentTuple =
+                
m_localServiceEndPointServiceReferenceComponents.get(serviceEndPoint);
+            if (serviceReferenceComponentTuple == null) {
+                // FIXME check is not ok
+                if (serviceEndPoint.getMemberId().equals(m_clusterGroupId)) {
+                    m_logService
+                        .log(LogService.LOG_WARNING,
+                            "Dropping EndpointInvokeMessage for unknown 
ServiceEndPoint: " + serviceEndPoint.toString());
+                }
+                else {
+                    m_logService.log(LogService.LOG_WARNING,
+                        "Dropping EndpointInvokeMessage for other 
clustermember: " + serviceEndPoint.toString());
+                }
+                return;
+            }
+            serviceResponse =
+                invokeService(endpointInvokeMessage, 
serviceReferenceComponentTuple.getServiceReference());
+        }
+        finally {
+            
m_localServiceEndPointServiceReferenceComponentsLock.readLock().unlock();
+        }
+        if (serviceResponse != null) {
+            
m_eventAdmin.postEvent(createEndpointResponseEvent(endpointInvokeMessage, 
serviceResponse));
+        }
+    }
+
+    private Object invokeService(final EndpointInvokeMessage 
endpointInvokeMessage,
+        final ServiceReference serviceReference) {
+
+        ServiceEndPoint serviceEndPoint = 
endpointInvokeMessage.getServiceEndPoint();
+        Map<String, Object> payload = endpointInvokeMessage.getPayload();
+        String methodName = (String) 
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
+        Object[] args = (Object[]) 
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
+        Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
+        Object serviceObject = m_bundleContext.getService(serviceReference);
+        if (serviceObject == null) {
+            m_logService.log(LogService.LOG_WARNING, "Dropping 
EndpointInvokeMessage for unavailable service: "
+                            + serviceEndPoint.toString());
+            return null;
+        }
+        try {
+            Method serviceMethod = 
serviceObject.getClass().getMethod(methodName, types);
+            Object serviceResponse = serviceMethod.invoke(serviceObject, args);
+            return serviceResponse;
+        }
+        catch (SecurityException e) {
+            m_logService.log(LogService.LOG_ERROR, "Exception during local 
service invocation", e);
+        }
+        catch (NoSuchMethodException e) {
+            m_logService.log(LogService.LOG_ERROR, "Exception during local 
service invocation", e);
+        }
+        catch (IllegalArgumentException e) {
+            m_logService.log(LogService.LOG_ERROR, "Exception during local 
service invocation", e);
+        }
+        catch (IllegalAccessException e) {
+            m_logService.log(LogService.LOG_ERROR, "Exception during local 
service invocation", e);
+        }
+        catch (InvocationTargetException e) {
+            m_logService.log(LogService.LOG_ERROR, "Exception during local 
service invocation", e);
+        }
+        finally {
+            m_bundleContext.ungetService(serviceReference);
+        }
+        return null;
+    }
+
+    private Event createEndpointResponseEvent(EndpointInvokeMessage 
endpointInvokeMessage, Object serviceResponse) {
+        Map<String, Object> payload = endpointInvokeMessage.getPayload();
+        String invocationId = (String) payload.get(MESSAGE_INVOCATION_ID_KEY);
+        String originClusterId = (String) 
endpointInvokeMessage.getOriginClusterId();
+        String originMemberId = (String) 
endpointInvokeMessage.getOriginMemberId();
+        String originServiceGroup = (String) 
endpointInvokeMessage.getOriginServiceGroup();
+
+        Map<String, Object> responsePayload = new HashMap<String, Object>();
+        responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
+        responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY, 
serviceResponse);
+
+        Dictionary<String, Object> eventPayload = new Hashtable<String, 
Object>();
+        eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointResponseMessage(originClusterId,
+            originMemberId, originServiceGroup, responsePayload));
+
+        Event responseEvent =
+            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                eventPayload);
+        return responseEvent;
+    }
+
+    /********************************************************
+     * Helper classes
+     ********************************************************/
+
+    class DistributionEventHandler implements EventHandler {
+
+        public void handleEvent(Event event) {
+            Object message = 
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+            if (message instanceof EndpointInvokeMessage) {
+                recieveEndpointInvokeMessage((EndpointInvokeMessage) message);
+                return;
+            }
+            throw new IllegalStateException("Unknown message type " + 
message.getClass().getName() + "on channel "
+                + REMOTE_TOPIC);
+        }
+    }
+
     static class ServiceReferenceComponentTuple {
         private final ServiceReference m_serviceReference;
         private final Component m_component;

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
        Fri Dec 31 17:13:10 2010
@@ -0,0 +1,267 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.service;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.amdatu.core.fabric.FabricManagerService;
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import 
org.amdatu.core.fabric.cluster.service.tribes.TribesClusterMemberServiceImpl;
+import org.amdatu.core.fabric.remote.DiscoveryService;
+import org.amdatu.core.fabric.remote.DistributionService;
+import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
+import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+public class FabricManagerServiceImpl implements FabricManagerService {
+
+    private final Map<String, Component> m_clusterMemberComponents = new 
HashMap<String, Component>();
+    private final ReentrantReadWriteLock m_clusterMemberComponentsLock = new 
ReentrantReadWriteLock();
+
+    private final Map<String, Component> m_discoveryComponents = new 
HashMap<String, Component>();
+    private final ReentrantReadWriteLock m_discoveryComponentsLock = new 
ReentrantReadWriteLock();
+
+    private final Map<String, Component> m_distributionComponents = new 
HashMap<String, Component>();
+    private final ReentrantReadWriteLock m_distributionComponentsLock = new 
ReentrantReadWriteLock();
+
+    private volatile DependencyManager m_dependencyManager;
+    private volatile Component m_component;
+    private volatile LogService m_logService;
+
+    /********************************************************
+     * Service lifecycle
+     ********************************************************/
+
+    public final synchronized void init() {
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        m_component.add(logServiceDependency);
+    }
+
+    public final synchronized void destroy() {
+    }
+
+    public synchronized void start() {
+        m_logService.log(LogService.LOG_WARNING, "Amdatu Service Fabric 
Manager started");
+    }
+
+    public synchronized void stop() {
+        m_logService.log(LogService.LOG_WARNING, "Amdatu Service Fabric 
Manager stopped");
+    }
+
+    /********************************************************
+     * ManagedService (controlled by dependencymanager)
+     ********************************************************/
+
+    public synchronized void updated(Dictionary<String, Object> dictionary) 
throws ConfigurationException {
+        if (dictionary != null) {
+            String clusternamesValue = (String) 
dictionary.get("org.amdatu.fabric.groupchannels");
+            if (clusternamesValue != null) {
+                String[] clusterNames = clusternamesValue.split(",");
+                for (String clusterName : clusterNames) {
+                    String clusterGroupId =
+                        (String) dictionary.get("org.amdatu.fabric." + 
clusterName + ".groupchannel");
+                    String clusterMemberId = (String) 
dictionary.get("org.amdatu.fabric." + clusterName + ".memberid");
+                    String args = (String) dictionary.get("org.amdatu.fabric." 
+ clusterName + ".tribes.args");
+                    Dictionary<String, Object> properties = new 
Hashtable<String, Object>();
+                    properties
+                        
.put(TribesClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, args.split(" "));
+
+                    createClusterMember(clusterGroupId, clusterMemberId, 
properties);
+                }
+            }
+
+            String remotenamesValue = (String) 
dictionary.get("org.amdatu.fabric.servicegroups");
+            if (remotenamesValue != null) {
+                String[] remoteNames = remotenamesValue.split(",");
+                for (String remoteName : remoteNames) {
+                    String clusterGroupId =
+                        (String) dictionary.get("org.amdatu.fabric." + 
remoteName + ".groupchannel");
+                    String serviceGroupId =
+                        (String) dictionary.get("org.amdatu.fabric." + 
remoteName + ".servicegroup");
+
+                    createDistribution(clusterGroupId, serviceGroupId);
+                    createDiscovery(clusterGroupId, serviceGroupId);
+                }
+            }
+        }
+        if (m_logService != null)
+            m_logService.log(LogService.LOG_WARNING, "Amdatu Fabric updated");
+    }
+
+    /********************************************************
+     * FabricManagerService interface
+     ********************************************************/
+
+    public boolean createClusterMember(String clusterGroupId, String 
clusterMemberId,
+        Dictionary<String, Object> properties) {
+        if (clusterMemberId == null || "".equals(clusterMemberId)) {
+            return false;
+        }
+
+        Dictionary<String, Object> svcProperties = new Hashtable<String, 
Object>();
+        svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
clusterMemberId);
+        svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP, 
clusterMemberId);
+
+        Component clusterMemberComponent =
+            m_dependencyManager.createComponent()
+                .setInterface(ClusterMemberService.class.getName(), properties)
+                .setImplementation(new 
TribesClusterMemberServiceImpl(clusterGroupId, clusterMemberId, properties))
+                .add(
+                    m_dependencyManager.createServiceDependency()
+                        .setService(FabricManagerService.class)
+                        .setRequired(true));
+
+        m_clusterMemberComponentsLock.writeLock().lock();
+        try {
+            if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId, 
clusterMemberId, ""))) {
+                m_dependencyManager.remove(m_clusterMemberComponents
+                    .remove(getKey(clusterGroupId, clusterMemberId, "")));
+            }
+            m_clusterMemberComponents.put(getKey(clusterGroupId, 
clusterMemberId, ""), clusterMemberComponent);
+            m_dependencyManager.add(clusterMemberComponent);
+            return true;
+        }
+        finally {
+            m_clusterMemberComponentsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean removeClusterMember(String clusterGroupId, String 
clusterMemberId) {
+        if (clusterMemberId == null || "".equals(clusterMemberId)) {
+            return false;
+        }
+
+        m_clusterMemberComponentsLock.writeLock().lock();
+        try {
+            if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId, 
clusterMemberId, ""))) {
+                m_dependencyManager.remove(m_clusterMemberComponents
+                    .remove(getKey(clusterGroupId, clusterMemberId, "")));
+                return true;
+            }
+            return false;
+        }
+        finally {
+            m_clusterMemberComponentsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean createDiscovery(String clusterGroupId, String 
serviceGroupId) {
+        // FIXME improve checking
+
+        Component discoveryComponent =
+            m_dependencyManager
+                .createComponent()
+                .setInterface(DiscoveryService.class.getName(),
+                    null)
+                .setImplementation(new DiscoveryServiceImpl(clusterGroupId, 
serviceGroupId))
+                .add(
+                    m_dependencyManager.createServiceDependency()
+                        .setService(FabricManagerService.class)
+                        .setRequired(true));
+
+        m_discoveryComponentsLock.writeLock().lock();
+        try {
+            if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "", 
serviceGroupId))) {
+                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId, 
"",
+                    serviceGroupId)));
+            }
+            m_discoveryComponents.put(getKey(clusterGroupId, "", 
serviceGroupId), discoveryComponent);
+            m_dependencyManager.add(discoveryComponent);
+            return true;
+        }
+        finally {
+            m_discoveryComponentsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean removeDiscovery(String clusterGroupId, String 
serviceGroupId) {
+        // FIXME improve checks
+
+        m_discoveryComponentsLock.writeLock().lock();
+        try {
+            if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "", 
serviceGroupId))) {
+                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId, 
"",
+                    serviceGroupId)));
+                return true;
+            }
+            return false;
+        }
+        finally {
+            m_discoveryComponentsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean createDistribution(String clusterGroupId, String 
serviceGroupId) {
+        // FIXME improve checks
+
+        Component distributionComponent =
+            m_dependencyManager
+                .createComponent()
+                .setInterface(DistributionService.class.getName(), null)
+                .setImplementation(new DistributionServiceImpl(clusterGroupId, 
serviceGroupId))
+                .add(
+                    m_dependencyManager.createServiceDependency()
+                        .setService(FabricManagerService.class)
+                        .setRequired(true));
+
+        m_distributionComponentsLock.writeLock().lock();
+        try {
+            if (m_distributionComponents.containsKey(getKey(clusterGroupId, 
"", serviceGroupId))) {
+                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
 "",
+                    serviceGroupId)));
+            }
+            m_distributionComponents.put(getKey(clusterGroupId, "", 
serviceGroupId),
+                distributionComponent);
+            m_dependencyManager.add(distributionComponent);
+            return true;
+        }
+        finally {
+            m_distributionComponentsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean removeDistribution(String clusterGroupId, String 
serviceGroupId) {
+        // FIXME improve checks
+
+        m_distributionComponentsLock.writeLock().lock();
+        try {
+            if (m_distributionComponents.containsKey(getKey(clusterGroupId, 
"", serviceGroupId))) {
+                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
 "",
+                    serviceGroupId)));
+                return true;
+            }
+            return false;
+        }
+        finally {
+            m_distributionComponentsLock.writeLock().unlock();
+        }
+    }
+
+    private String getKey(String clusterGroupId, String clusterMemberId, 
String serviceGroupId) {
+        return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
+    }
+}

Reply via email to