Author: bdekruijff at gmail.com
Date: Mon Jan  3 17:18:44 2011
New Revision: 555

Log:
[sandbox] servicegroups / depman / javadoc / errorchecking and more

Added:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
      - copied, changed from r553, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
Removed:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
Modified:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
   
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
    Mon Jan  3 17:18:44 2011
@@ -16,11 +16,90 @@
  */
 package org.amdatu.core.fabric.cluster;
 
+/**
+ * Service interface for the Amdatu Fabric clustering and communication
+ * component. A service registration for this component manages the
+ * lifecycle for a specific unique <code>ClusterChannel</code> that
+ * provides discovery and message based communication with other Amdatu
+ * nodes on the same channel. A service instance has a unique
+ * <code>ClusterChannel</code> and a <code>ChannelMember</code> property
+ * unique to that channel. These properties are published as service
+ * registration properties along with any additional configuration that
+ * may be specific to the underlying implementation.
+ * <p/>
+ * All interaction with a <code>ClusterMemberService</code> is asynchronous
+ * and flows through the OSGi <code>EventAdmin</code> service. Messages
+ * can be posted to the clusterchannel specific SEND topic for delivery
+ * to the cluster. Messages received from the cluster will by default
+ * be posted on the clusterchannel specific RECIEVE topic. If the message
+ * class implements the <code>LocalTopicMessage</code> interface the
+ * topic specified by the message itself takes precedence.
+ * <p/>
+ * For optimization purposes basic message routing is supported through
+ * the <code>RoutableMessage</code> base class. Any message object extending
+ * this class can provide optional target information. In addition it will
+ * have origin information injected upon sending providing the receiving
+ * end with the information required to route a message back.
+ * <p/>
+ * Sample service registration:
+ * <pre>
+ * objectClass = org.amdatu.core.fabric.cluster.ClusterMemberService
+ * org.amdatu.fabric.cluster.CLUSTERGROUP = cluster-1
+ * org.amdatu.fabric.cluster.CLUSTERMEMBER = member-1
+ * org.amdatu.fabric.cluster.tribes.args = -port, 8881, -mport, 8888
+ * service.id = 922
+ * </pre>
+ * 
+ */
 public interface ClusterMemberService {
 
-    String CLUSTER_CLUSTERGROUP_PROP = 
"org.amdatu.fabric.cluster.CLUSTERGROUP";
-    String CLUSTER_CLUSTERMEMBER_PROP = 
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
+    /**
+     * OSGi service registration property name that is used to publish the
+     * clusterchannel <code>String</code> value for this service. This may be
+     * used for service selection.
+     */
+    String SERVICE_CLUSTERCHANNEL_PROPERTY = 
"org.amdatu.fabric.cluster.CLUSTERGROUP";
+
+    /**
+     * OSGi service registration property name that is used to publish the
+     * clustermember <code>String</code> value for this service. This may be
+     * used for service selection.
+     */
+    String SERVICE_CLUSTERMEMBER_PROPERTY = 
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
+
+    /**
+     * OSGi service registration property name that is used to publish the
+     * configuration properties <code>Dictionary&lt;String, Object&gt;</code>
+     * value for this service. This may be used for service selection.
+     */
+    String SERVICE_CONFIGURATION_PROPERTY = 
"org.amdatu.fabric.cluster.CONFIGURATION";
+
+    /**
+     * <code>String</code> value to be replaced by the appropriate 
clusterchannel
+     * value in topic templates.
+     */
+    String EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER = 
"[ClusterChannel]";
+
+    /**
+     * <code>EventAdmin</code> topic template to post message events to that
+     * should be broadcasted over the <code>ClusterChannelService</code>. Note
+     * that <code>[ClusterChannel]</code> should be replaced by the appropriate
+     * clusterchannel value.
+     */
+    String EVENT_SEND_TOPIC_TEMPLATE = "org/amdatu/fabric/cluster/SEND/"
+        + EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER;
+
+    /**
+     * <code>EventAdmin</code> topic template that messages received by the
+     * <code>ClusterChannelService</code> are posted to as events. Note that
+     * <code>[ClusterChannel]</code> should be replaced by the appropriate 
value.
+     */
+    String EVENT_RECIEVE_TOPIC_TEMPLATE = "org/amdatu/fabric/cluster/RECIEVE/"
+        + EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER;
 
-    String EVENT_TOPIC_BROADCAST = "org/amdatu/fabric/cluster/BROADCAST";
+    /**
+     * <code>Event</code> properties key where messages are (to be) stored in
+     * events posted to or received by the <code>ClusterMemberService</code>.
+     */
     String EVENT_MESSAGE_PROPERTY = "org/amdatu/fabric/cluster/MESSAGE";
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
       (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
       Mon Jan  3 17:18:44 2011
@@ -17,9 +17,17 @@
 package org.amdatu.core.fabric.cluster;
 
 /**
- * Interface for messages that want to specify the EventAdmin topic that the
- * ClusterMemberService should post them on at the receiver end.
+ * Interface for messages that want to specify the <code>EventAdmin</code>
+ * topic that the <code>ClusterMemberService</code> should post them on
+ * at the receiver end.
  */
 public interface LocalTopicMessage {
+
+    /**
+     * Returns a valid <code>EventAdmin</code> topic that consumer may
+     * use to receive it.
+     * 
+     * @return the topic
+     */
     String getLocalTopic();
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 Mon Jan  3 17:18:44 2011
@@ -1,13 +1,31 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
 package org.amdatu.core.fabric.cluster;
 
 import java.io.Serializable;
 
 /**
- * Base class for messages that want to enable routing support. Target 
information may
- * be set to avoid a cluster wide broadcast. The origin information will be 
injected by
- * the ClusterMemberService upon broadcast/send.
+ * Base class for messages that want to enable 
<code>ClusterMemberService</code>
+ * routing support. Target information may be set to avoid a cluster wide
+ * broadcast. The origin information will be injected by the 
ClusterMemberService
+ * upon sending.
  * 
  * FIXME ServiceGroup is not a cluster concept
+ * FIXME reduce member names to save bytes?
  */
 public abstract class RoutableMessage implements Serializable {
 
@@ -79,5 +97,4 @@
     public final void setTargetServiceGroup(String serviceGroup) {
         m_targetServiceGroup = serviceGroup;
     }
-
 }
\ No newline at end of file

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
 Mon Jan  3 17:18:44 2011
@@ -0,0 +1,49 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
+
+public final class ClusterMemberUtilities {
+
+    private final static Pattern m_validNamePattern = 
Pattern.compile("[a-zA-Z0-9-_]+");
+
+    public static String getClusterChannelSendTopic(final String 
clusterChannel) {
+        return ClusterMemberService.EVENT_SEND_TOPIC_TEMPLATE.replace(
+            
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER, 
clusterChannel);
+    }
+
+    public static String getClusterChannelReceiveTopic(final String 
clusterChannel) {
+        return ClusterMemberService.EVENT_RECIEVE_TOPIC_TEMPLATE.replace(
+            
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER, 
clusterChannel);
+    }
+
+    public static boolean isValidChannelName(final String channelName) {
+        if (channelName == null || "".equals(channelName)) {
+            return false;
+        }
+        Matcher matcher = m_validNamePattern.matcher(channelName);
+        return matcher.matches();
+    }
+
+    public static boolean isValidMemberName(final String memberName) {
+        return isValidChannelName(memberName);
+    }
+}

Copied: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
 (from r553, 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java)
==============================================================================
--- 
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
       (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
        Mon Jan  3 17:18:44 2011
@@ -27,6 +27,7 @@
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
 import org.apache.felix.dm.ServiceDependency;
@@ -39,14 +40,17 @@
 /**
  * I manage cluster state
  */
-public abstract class BaseClusterMemberService implements ClusterMemberService 
{
+public abstract class ClusterMemberServiceBase implements ClusterMemberService 
{
 
     private final Map<String, ClusterMember> m_clusterMembers = new 
HashMap<String, ClusterMember>();
     private final ReentrantReadWriteLock m_clusterMembersLock = new 
ReentrantReadWriteLock();
 
     private final String m_clusterId;
     private final String m_memberId;
-    private final Map<String, Object> m_properties;
+    private final Dictionary<String, Object> m_properties;
+
+    private final String m_recieveEventTopic;
+    private final String m_sendEventTopic;
 
     // injected
     private volatile DependencyManager m_dependencyManager;
@@ -60,11 +64,11 @@
      * Constructors
      ********************************************************/
 
-    public BaseClusterMemberService(String clusterGroupId, String 
clusterMemberId,
+    public ClusterMemberServiceBase(String clusterGroupId, String 
clusterMemberId,
         Dictionary<String, Object> properties) {
         m_clusterId = clusterGroupId;
         m_memberId = clusterMemberId;
-        m_properties = new HashMap<String, Object>();
+        m_properties = new Hashtable<String, Object>();
         if (properties != null) {
             Enumeration<String> enumeration = properties.keys();
             while (enumeration.hasMoreElements()) {
@@ -72,6 +76,8 @@
                 m_properties.put(key, properties.get(key));
             }
         }
+        m_recieveEventTopic = 
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterId);
+        m_sendEventTopic = 
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterId);
     }
 
     /********************************************************
@@ -79,27 +85,32 @@
      ********************************************************/
 
     public final synchronized void init() {
+
         @SuppressWarnings("unchecked")
         Dictionary<String, Object> serviceProps = 
m_component.getServiceProperties();
         if (serviceProps == null) {
             serviceProps = new Hashtable<String, Object>();
         }
-        serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterId);
-        serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP, 
m_memberId);
+        
+        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterId);
+        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY, 
m_memberId);
+        serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY, 
m_properties);
         m_component.setServiceProperties(serviceProps);
 
         ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
         logServiceDependency.setService(LogService.class);
         logServiceDependency.setRequired(true);
+        logServiceDependency.setInstanceBound(true);
         m_component.add(logServiceDependency);
 
         ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
         eventAdminServiceDependency.setService(EventAdmin.class);
         eventAdminServiceDependency.setRequired(true);
+        eventAdminServiceDependency.setInstanceBound(true);
         m_component.add(eventAdminServiceDependency);
 
         Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(EventConstants.EVENT_TOPIC, new String[] { 
EVENT_TOPIC_BROADCAST + "/" + m_clusterId });
+        props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic 
});
         m_broadcastEventHandlerComponent = 
m_dependencyManager.createComponent();
         
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(), 
props);
         m_broadcastEventHandlerComponent.setImplementation(new 
BroadcastEventHandler());
@@ -123,23 +134,27 @@
     }
 
     /********************************************************
-     * ClusterMemberService
+     * Protected methods
      ********************************************************/
 
-    public final String getClusterId() {
+    protected final LogService getLogService() {
+        return m_logService;
+    }
+
+    protected final String getClusterId() {
         return m_clusterId;
     }
 
-    public final String getMemberId() {
+    protected final String getMemberId() {
         return m_memberId;
     }
 
-    public final Dictionary<String, Object> getProperties() {
-        return new Hashtable<String, Object>(m_properties);
+    protected final Dictionary<String, Object> getProperties() {
+        return m_properties;
 
     }
 
-    public final ClusterMember[] getClusterMembers() {
+    protected final ClusterMember[] getClusterMembers() {
         m_clusterMembersLock.readLock().lock();
         try {
             return m_clusterMembers.values().toArray(new 
ClusterMember[m_clusterMembers.size()]);
@@ -149,7 +164,7 @@
         }
     }
 
-    public final ClusterMember getClusterMember(String memberId) {
+    protected final ClusterMember getClusterMember(final String memberId) {
         m_clusterMembersLock.readLock().lock();
         try {
             return m_clusterMembers.get(memberId);
@@ -159,14 +174,6 @@
         }
     }
 
-    /********************************************************
-     * for implementing concrete classes
-     ********************************************************/
-
-    protected final LogService getLogService() {
-        return m_logService;
-    }
-
     protected final void addClusterMember(ClusterMember clusterMember) {
         m_clusterMembersLock.writeLock().lock();
         try {
@@ -177,10 +184,10 @@
         }
     }
 
-    protected final void removeClusterMember(ClusterMember clusterMember) {
+    protected final void removeClusterMember(String memberId) {
         m_clusterMembersLock.writeLock().lock();
         try {
-            m_clusterMembers.remove(clusterMember.getMemberId());
+            m_clusterMembers.remove(memberId);
         }
         finally {
             m_clusterMembersLock.writeLock().unlock();
@@ -190,15 +197,20 @@
     protected final void dispatchMessage(Object message) {
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(EVENT_MESSAGE_PROPERTY, message);
-        // FIXME DO NOT REBROADCAST
-        String topic = EVENT_TOPIC_BROADCAST;
         if (message instanceof LocalTopicMessage) {
-            topic = ((LocalTopicMessage) message).getLocalTopic();
+            String topic = ((LocalTopicMessage) message).getLocalTopic();
+            Event broadCastEvent = new Event(topic, props);
+            m_eventAdmin.postEvent(broadCastEvent);
+            return;
         }
-        Event broadCastEvent = new Event(topic, props);
+        Event broadCastEvent = new Event(m_recieveEventTopic, props);
         m_eventAdmin.postEvent(broadCastEvent);
     }
 
+    /********************************************************
+     * Abstract methods
+     ********************************************************/
+
     protected abstract void onInit();
 
     protected abstract void onDestroy();
@@ -212,7 +224,7 @@
     protected abstract void doSend(ClusterMember[] clusterMember, Object 
message);
 
     /********************************************************
-     * helper classes
+     * Helper classes
      ********************************************************/
 
     class BroadcastEventHandler implements EventHandler {
@@ -248,14 +260,10 @@
 
                 if (clusterMember != null) {
                     doSend(new ClusterMember[] { clusterMember }, message);
+                    return;
                 }
-                else {
-                    doBroadcast(message);
-                }
-            }
-            else {
-                doBroadcast(message);
             }
+            doBroadcast(message);
         }
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   Mon Jan  3 17:18:44 2011
@@ -22,16 +22,19 @@
 import java.io.Serializable;
 import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
 import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
-import org.amdatu.core.fabric.cluster.service.BaseClusterMemberService;
+import org.amdatu.core.fabric.cluster.service.ClusterMemberServiceBase;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelListener;
@@ -40,12 +43,15 @@
 import org.apache.catalina.tribes.MembershipListener;
 import org.osgi.service.log.LogService;
 
-public final class TribesClusterMemberServiceImpl extends 
BaseClusterMemberService {
+public final class TribesClusterMemberServiceImpl extends 
ClusterMemberServiceBase {
 
     public static final String CLUSTER_TRIBES_ARGS_PROP = 
"org.amdatu.fabric.cluster.tribes.args";
 
-    private final Map<ClusterMember, Member> m_clusterMemberMembers = new 
HashMap<ClusterMember, Member>();
-    private final ReentrantReadWriteLock m_clusterMemberMembersLock = new 
ReentrantReadWriteLock();
+    private final Map<String, Member> m_memberIdMembers = new HashMap<String, 
Member>();
+    private final ReentrantReadWriteLock m_memberIdMembersLock = new 
ReentrantReadWriteLock();
+
+    private final Set<Member> m_faultyMembers = new HashSet<Member>();
+    private final ReentrantReadWriteLock m_faultyMembersLock = new 
ReentrantReadWriteLock();
 
     private volatile ManagedChannel m_managedChannel;
 
@@ -76,7 +82,8 @@
                     CLUSTER_TRIBES_ARGS_PROP));
 
             Properties props = new Properties();
-            props.setProperty(CLUSTER_CLUSTERMEMBER_PROP, getMemberId());
+            props.setProperty(SERVICE_CLUSTERCHANNEL_PROPERTY, getClusterId());
+            props.setProperty(SERVICE_CLUSTERMEMBER_PROPERTY, getMemberId());
             m_managedChannel.addMembershipListener(new 
TribesMembershipListener());
             m_managedChannel.addChannelListener(new TribesChannelListener());
             
m_managedChannel.getMembershipService().setPayload(getPayload(props));
@@ -103,66 +110,69 @@
 
     @Override
     public void doBroadcast(Object message) {
-        if (message instanceof Serializable) {
-            try {
-                Member[] members = m_managedChannel.getMembers();
-                if (members.length > 0) {
-                    m_managedChannel.send(members, (Serializable) message,
-                        Channel.SEND_OPTIONS_ASYNCHRONOUS);
-                }
-                else {
-                    getLogService().log(
-                        LogService.LOG_WARNING,
-                        "Dropping message during broadcast because there are 
no members on my channel: "
+        if (!(message instanceof Serializable)) {
+            getLogService().log(LogService.LOG_ERROR,
+                "Dropping message during broadcast because is is not 
Serializable: "
+                    + message.toString());
+            return;
+        }
+        m_memberIdMembersLock.readLock().lock();
+        try {
+            if (m_memberIdMembers.size() == 0) {
+                getLogService().log(LogService.LOG_WARNING,
+                        "Dropping message during send because there are no 
active members on my channel: "
                             + message.toString());
-                }
-            }
-            catch (ChannelException e) {
-                getLogService().log(LogService.LOG_ERROR, "Exception during 
send on managed channel", e);
+                return;
             }
+            Member[] members = m_memberIdMembers.values().toArray(new 
Member[m_memberIdMembers.size()]);
+            m_managedChannel.send(members, (Serializable) message, 
Channel.SEND_OPTIONS_ASYNCHRONOUS);
         }
-        else {
+        catch (ChannelException e) {
             getLogService().log(LogService.LOG_ERROR,
-                "Dropping message of type " + message.getClass().getName() + " 
because it is not Serializable: "
-                    + message.toString());
+                "Exception during send on managed channel: " + 
message.toString(), e);
+        }
+        finally {
+            m_memberIdMembersLock.readLock().unlock();
         }
     }
 
     @Override
     public void doSend(ClusterMember[] clusterMembers, Object message) {
-        if (message instanceof Serializable) {
-            try {
-                List<Member> members = new LinkedList<Member>();
-                m_clusterMemberMembersLock.readLock().lock();
-                try {
-                    for (ClusterMember clusterMember : clusterMembers) {
-                        Member member = 
m_clusterMemberMembers.get(clusterMember);
-                        if (member != null) {
-                            members.add(member);
-                        }
-                    }
-                }
-                finally {
-                    m_clusterMemberMembersLock.readLock().unlock();
-                }
-                if (members.size() > 0) {
-                    m_managedChannel.send(members.toArray(new 
Member[members.size()]), (Serializable) message,
-                        Channel.SEND_OPTIONS_ASYNCHRONOUS);
-                }
-                else {
-                    getLogService().log(LogService.LOG_WARNING,
-                        "Dropping message during send because there are no 
matching members on my channel: "
-                            + message.toString());
+        if (!(message instanceof Serializable)) {
+            getLogService().log(LogService.LOG_ERROR,
+                "Dropping message during broadcast because is is not 
Serializable: "
+                    + message.toString());
+            return;
+        }
+        List<Member> memberList = new LinkedList<Member>();
+        m_memberIdMembersLock.readLock().lock();
+        try {
+            for (ClusterMember clusterMember : clusterMembers) {
+                Member member = 
m_memberIdMembers.get(clusterMember.getMemberId());
+                if (member != null) {
+                    memberList.add(member);
                 }
             }
-            catch (ChannelException e) {
-                getLogService().log(LogService.LOG_ERROR, "Exception during 
send on managed channel", e);
+            if (memberList.size() == 0) {
+                getLogService().log(LogService.LOG_WARNING,
+                            "Dropping message during send because there are no 
matching members on my channel: "
+                                + message.toString());
+                return;
             }
+            Member[] members = memberList.toArray(new 
Member[memberList.size()]);
+            m_managedChannel.send(members, (Serializable) message, 
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+        }
+        catch (ChannelException e) {
+            getLogService().log(LogService.LOG_ERROR,
+                "Exception during send on managed channel: " + 
message.toString(), e);
+        }
+        finally {
+            m_memberIdMembersLock.readLock().unlock();
         }
     }
 
     /********************************************************
-     * Utility methods
+     * Private methods
      ********************************************************/
 
     private byte[] getPayload(Properties props) throws IOException {
@@ -178,6 +188,48 @@
         return props;
     }
 
+    private void addMemberIdMember(String clusterMember, Member member) {
+        m_memberIdMembersLock.writeLock().lock();
+        try {
+            m_memberIdMembers.put(clusterMember, member);
+        }
+        finally {
+            m_memberIdMembersLock.writeLock().unlock();
+        }
+        addClusterMember(new ClusterMemberImpl(clusterMember));
+    }
+
+    private void removeMemberIdMember(String clusterMember) {
+        m_memberIdMembersLock.writeLock().lock();
+        try {
+            m_memberIdMembers.remove(clusterMember);
+        }
+        finally {
+            m_memberIdMembersLock.writeLock().unlock();
+        }
+        removeClusterMember(clusterMember);
+    }
+
+    private void addFaultyMember(Member member) {
+        m_faultyMembersLock.writeLock().lock();
+        try {
+            m_faultyMembers.add(member);
+        }
+        finally {
+            m_faultyMembersLock.writeLock().unlock();
+        }
+    }
+
+    private boolean removeFaultyMember(Member member) {
+        m_faultyMembersLock.writeLock().lock();
+        try {
+            return m_faultyMembers.remove(member);
+        }
+        finally {
+            m_faultyMembersLock.writeLock().unlock();
+        }
+    }
+
     /********************************************************
      * Helper classes
      ********************************************************/
@@ -186,17 +238,24 @@
 
         public void memberAdded(Member member) {
             try {
-                getLogService().log(LogService.LOG_DEBUG, "Member added: " + 
member.toString());
-                ClusterMember clusterMember = new 
ClusterMemberImpl(getProperties(member.getPayload())
-                    .getProperty(CLUSTER_CLUSTERMEMBER_PROP));
-                m_clusterMemberMembersLock.writeLock().lock();
-                try {
-                    m_clusterMemberMembers.put(clusterMember, member);
-                    addClusterMember(clusterMember);
+                Properties props = getProperties(member.getPayload());
+                String groupchannel = (String) 
props.get(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY);
+                if (groupchannel == null || 
!groupchannel.equals(getClusterId())) {
+                    getLogService().log(LogService.LOG_ERROR,
+                        "Member joined with invalid channelid! Will ignore it: 
" + member.toString());
+                    addFaultyMember(member);
+                    return;
                 }
-                finally {
-                    m_clusterMemberMembersLock.writeLock().unlock();
+                String channelmember = (String) 
props.get(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY);
+                if (channelmember == null || 
channelmember.equals(getMemberId())
+                    || getClusterMember(channelmember) != null) {
+                    getLogService().log(LogService.LOG_ERROR,
+                        "Member joined with invalid memberid! Will ignore it: 
" + member.toString());
+                    addFaultyMember(member);
+                    return;
                 }
+                addMemberIdMember(channelmember, member);
+                getLogService().log(LogService.LOG_DEBUG, "Member added: " + 
member.toString());
             }
             catch (Exception e) {
                 getLogService().log(LogService.LOG_ERROR, "Exception while 
adding member: " + member.toString(), e);
@@ -205,27 +264,12 @@
 
         public void memberDisappeared(Member member) {
             try {
-                // FIXME use memberid to tuple map to reduce object creation
-                getLogService().log(LogService.LOG_DEBUG, "Member disappeared: 
" + member.toString());
-                String memberId = getProperties(member.getPayload())
-                    .getProperty(CLUSTER_CLUSTERMEMBER_PROP);
-                ClusterMember toBeRemoved = null;
-                m_clusterMemberMembersLock.writeLock().lock();
-                try {
-                    for (ClusterMember clusterMember : 
m_clusterMemberMembers.keySet()) {
-                        if (clusterMember.getMemberId().equals(memberId)) {
-                            toBeRemoved = clusterMember;
-                            break;
-                        }
-                    }
-                    if (toBeRemoved != null) {
-                        removeClusterMember(toBeRemoved);
-                        m_clusterMemberMembers.remove(toBeRemoved);
-                    }
-                }
-                finally {
-                    m_clusterMemberMembersLock.writeLock().unlock();
+                if (removeFaultyMember(member)) {
+                    getLogService().log(LogService.LOG_DEBUG, "Faulty member 
disappeared: " + member.toString());
+                    return;
                 }
+                String memberId = 
getProperties(member.getPayload()).getProperty(SERVICE_CLUSTERMEMBER_PROPERTY);
+                removeMemberIdMember(memberId);
             }
             catch (Exception e) {
                 getLogService().log(LogService.LOG_ERROR, "Exception while 
removing member: " + member.toString(), e);
@@ -236,6 +280,17 @@
     class TribesChannelListener implements ChannelListener {
 
         public boolean accept(Serializable message, Member member) {
+            m_faultyMembersLock.readLock().lock();
+            try {
+                if (m_faultyMembers.contains(member)) {
+                    getLogService().log(LogService.LOG_WARNING,
+                        "Dropping message recieved from faulty member: " + 
member.toString());
+                    return false;
+                }
+            }
+            finally {
+                m_faultyMembersLock.readLock().unlock();
+            }
             return true;
         }
 
@@ -243,23 +298,4 @@
             dispatchMessage(message);
         }
     }
-
-    static class ClusterMemberMemberTuple {
-
-        private final ClusterMember m_clusterMember;
-        private final Member m_member;
-
-        public ClusterMemberMemberTuple(final ClusterMember clusterMember, 
final Member member) {
-            m_clusterMember = clusterMember;
-            m_member = member;
-        }
-
-        public ClusterMember getClusterMember() {
-            return m_clusterMember;
-        }
-
-        public Member getMember() {
-            return m_member;
-        }
-    }
 }
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
  Mon Jan  3 17:18:44 2011
@@ -30,11 +30,17 @@
 
         manager.add(createComponent()
             .setInterface(FabricManagerService.class.getName(), null)
-            .setImplementation(new FabricManagerServiceImpl())
+            .setFactory(new FabricManagerServiceFactory(), "getInstance")
             
.add(createConfigurationDependency().setPid(FabricManagerService.CONFIGURATION_PID)));
     }
 
     @Override
     public void destroy(BundleContext context, DependencyManager manager) 
throws Exception {
     }
+
+    static class FabricManagerServiceFactory {
+        public FabricManagerService getInstance() {
+            return new FabricManagerServiceImpl();
+        }
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   Mon Jan  3 17:18:44 2011
@@ -29,6 +29,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
 import org.apache.felix.dm.Component;
@@ -71,9 +72,9 @@
      * Constructors
      ********************************************************/
 
-  public LocalServiceInvocationHandler(String clusterGroupId, String 
serviceGroupId,
-  ServiceEndPoint serviceEndpoint,
-  Class<?>[] interfaceClasses) {
+    public LocalServiceInvocationHandler(String clusterGroupId, String 
serviceGroupId,
+        ServiceEndPoint serviceEndpoint,
+        Class<?>[] interfaceClasses) {
         m_clusterGroupId = clusterGroupId;
         m_serviceGroupId = serviceGroupId;
         m_serviceEndpoint = serviceEndpoint;
@@ -97,16 +98,17 @@
      ********************************************************/
 
     public synchronized void init() {
-        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
-        logServiceDependency.setService(LogService.class);
-        logServiceDependency.setRequired(true);
-        m_component.add(logServiceDependency);
-
         ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
         eventAdminServiceDependency.setService(EventAdmin.class);
         eventAdminServiceDependency.setRequired(true);
+        eventAdminServiceDependency.setInstanceBound(true);
         m_component.add(eventAdminServiceDependency);
 
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(false);
+        m_component.add(logServiceDependency);
+
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(EventConstants.EVENT_TOPIC,
             new String[] { 
DistributionUtilities.getLocalResponseTopic(m_clusterGroupId, m_serviceGroupId) 
});
@@ -144,7 +146,7 @@
             Dictionary<String, Object> eventPayload = new Hashtable<String, 
Object>();
             eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, 
message);
             Event event =
-                new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+                new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                     eventPayload);
             m_eventAdmin.postEvent(event);
 

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     Mon Jan  3 17:18:44 2011
@@ -25,6 +25,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
 import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
@@ -46,6 +47,7 @@
 /**
  * I keep track of local RemotableServiceEndpoint services and publish them in 
the cluster
  * I listen to the cluster and publish local RemoteServiceEndpoint services 
for them
+ * 
  * TODO support LOOKUP requests for more fine grained discovery (use 
ListenerHook)
  */
 public final class DiscoveryServiceImpl implements DiscoveryService {
@@ -81,37 +83,41 @@
      ********************************************************/
 
     public synchronized void init() {
+
         @SuppressWarnings("unchecked")
         Dictionary<String, Object> discoveryProps = 
m_component.getServiceProperties();
         if (discoveryProps == null) {
             discoveryProps = new Hashtable<String, Object>();
         }
-        discoveryProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        
discoveryProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterGroupId);
         discoveryProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
         m_component.setServiceProperties(discoveryProps);
 
-        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
-        logServiceDependency.setService(LogService.class);
-        logServiceDependency.setRequired(true);
-        m_component.add(logServiceDependency);
-
         ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
         eventAdminServiceDependency.setService(EventAdmin.class);
         eventAdminServiceDependency.setRequired(true);
+        eventAdminServiceDependency.setInstanceBound(true);
         m_component.add(eventAdminServiceDependency);
 
         ServiceDependency clusterMemberDependency = 
m_dependencyManager.createServiceDependency();
         clusterMemberDependency
             .setService(
                 ClusterMemberService.class,
-                "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" + 
m_clusterGroupId + ")")
+                "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + 
"=" + m_clusterGroupId + ")")
             .setRequired(true);
+        clusterMemberDependency.setInstanceBound(true);
         m_component.add(clusterMemberDependency);
 
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        logServiceDependency.setInstanceBound(true);
+        m_component.add(logServiceDependency);
+
         ServiceDependency remotableServiceEndpointsDependecy = 
m_dependencyManager.createServiceDependency();
         remotableServiceEndpointsDependecy
                .setService(RemotableServiceEndpoint.class,
-                   "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                   "(&(" + 
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
                        + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
                        + m_serviceGroupId + "))")
                .setCallbacks("remotableServiceEndPointAdded", 
"remotableServiceEndPointRemoved")
@@ -133,7 +139,7 @@
     public synchronized void start() {
         m_logService.log(LogService.LOG_INFO, "Starting " + toString());
         m_dependencyManager.add(m_discoveryEventHandlerComponent);
-        m_evenAdmin.postEvent(createdDiscoveryEvent());
+        m_evenAdmin.postEvent(createDiscoveryEvent());
     }
 
     public synchronized void stop() {
@@ -188,12 +194,12 @@
      * Private methods
      ********************************************************/
 
-    private Event createdDiscoveryEvent() {
+    private Event createDiscoveryEvent() {
         Dictionary<String, Object> eventProps = new Hashtable<String, 
Object>();
         eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
             new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
         Event discoveryEvent =
-            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+            new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                 eventProps);
         return discoveryEvent;
     }
@@ -202,7 +208,7 @@
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointPublishMessage(serviceEndPoint));
         Event event =
-            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+            new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                 props);
         return event;
     }
@@ -211,7 +217,7 @@
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointDepublishMessage(serviceEndPoint));
         Event event =
-            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+            new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                 props);
         return event;
     }
@@ -269,7 +275,7 @@
                 return;
             }
             Dictionary<String, Object> distributionProps = new 
Hashtable<String, Object>();
-            
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
+            
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
                 m_clusterGroupId);
             distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
                 m_serviceGroupId);
@@ -279,7 +285,7 @@
                     .setImplementation(new 
RemoteServiceEndPointImpl(serviceEndPoint));
             
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
                 DiscoveryService.class,
-                "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                "(&(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + 
"="
                     + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
                     + m_serviceGroupId + "))")
                 .setRequired(true));

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
  Mon Jan  3 17:18:44 2011
@@ -26,6 +26,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.DistributionService;
 import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
@@ -78,25 +79,26 @@
     private final String m_serviceGroupId;
 
     /********************************************************
-     * Constructors
+     * Constructor methods
      ********************************************************/
 
-        public DistributionServiceImpl(String clusterGroupId, String 
serviceGroupId) {
+    public DistributionServiceImpl(String clusterGroupId, String 
serviceGroupId) {
         m_clusterGroupId = clusterGroupId;
         m_serviceGroupId = serviceGroupId;
     }
 
     /********************************************************
-     * Service lifecycle
+     * Lifecycle methods
      ********************************************************/
 
     public synchronized void init() {
+
         @SuppressWarnings("unchecked")
         Dictionary<String, Object> distributionProps = 
m_component.getServiceProperties();
         if (distributionProps == null) {
             distributionProps = new Hashtable<String, Object>();
         }
-        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterGroupId);
         distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
         
distributionProps.put(DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED_PROP,
             DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED);
@@ -104,24 +106,27 @@
             DistributionService.DISTRIBUTION_INTENTS_SUPPORTED);
         m_component.setServiceProperties(distributionProps);
 
-        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
-        logServiceDependency.setService(LogService.class);
-        logServiceDependency.setRequired(true);
-        m_component.add(logServiceDependency);
-
         ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
         eventAdminServiceDependency.setService(EventAdmin.class);
         eventAdminServiceDependency.setRequired(true);
+        eventAdminServiceDependency.setInstanceBound(true);
         m_component.add(eventAdminServiceDependency);
 
         ServiceDependency clusterMemberDependency = 
m_dependencyManager.createServiceDependency();
         clusterMemberDependency
             .setService(
                 ClusterMemberService.class,
-                "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" + 
m_clusterGroupId + ")")
+                "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + 
"=" + m_clusterGroupId + ")")
                 .setRequired(true);
+        clusterMemberDependency.setInstanceBound(true);
         m_component.add(clusterMemberDependency);
 
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        logServiceDependency.setInstanceBound(true);
+        m_component.add(logServiceDependency);
+
         ServiceDependency remotableServicesDependecy = 
m_dependencyManager.createServiceDependency();
         remotableServicesDependecy
                .setService(
@@ -135,7 +140,7 @@
         ServiceDependency remoteServiceEndpointsDependecy = 
m_dependencyManager.createServiceDependency();
         remoteServiceEndpointsDependecy
                .setService(RemoteServiceEndPoint.class,
-                   "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+                   "(&(" + 
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
                        + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
                        + m_serviceGroupId + "))")
                .setCallbacks("remoteServiceEndPointAdded", 
"remoteServiceEndPointRemoved")
@@ -172,6 +177,12 @@
 
     public void localRemotableServiceAdded(final ServiceReference 
serviceReference /* , Object Service */) {
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
+        if (!isServiceEndpointForServiceGroup(serviceEndPoint)) {
+            m_logService
+                .log(LogService.LOG_DEBUG,
+                    "Ignoring ServiceEndpoint not for this serviceGroup: " + 
serviceEndPoint.toString());
+            return;
+        }
         if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
             m_logService
                 .log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint 
configuration " + serviceEndPoint.toString());
@@ -195,6 +206,12 @@
 
     public void localRemotableServiceRemoved(ServiceReference serviceReference 
/* , Object Service */) {
         ServiceEndPoint serviceEndPoint = 
serviceEndPointFromServiceReference(serviceReference);
+        if (!isServiceEndpointForServiceGroup(serviceEndPoint)) {
+            m_logService
+                .log(LogService.LOG_DEBUG,
+                    "Ignoring ServiceEndpoint not for this serviceGroup: " + 
serviceEndPoint.toString());
+            return;
+        }
         
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
         try {
             if 
(m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) 
{
@@ -296,7 +313,7 @@
 
     private Component createRemotableEndPointComponent(final ServiceEndPoint 
serviceEndPoint) {
         Dictionary<String, Object> distributionProps = new Hashtable<String, 
Object>();
-        distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterGroupId);
         distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
         Component serviceComponent =
                 m_dependencyManager.createComponent()
@@ -307,7 +324,7 @@
                 .createServiceDependency()
                 .setService(
                     DistributionService.class,
-                    "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + 
"="
+                    "(&(" + 
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
                         + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
                         + m_serviceGroupId + "))")
                 .setRequired(true);
@@ -344,7 +361,7 @@
         props.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
         props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
         props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
-        props.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
m_clusterGroupId);
+        props.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterGroupId);
         props.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP, 
m_serviceGroupId);
         props.put(DistributionService.SERVICE_IMPORTED_PROP, "true");
         props.put(DistributionService.SERVICE_INTENTS_PROP, importedIntents);
@@ -358,7 +375,7 @@
                 .createServiceDependency()
                 .setService(
                     DistributionService.class,
-                    "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + 
"="
+                    "(&(" + 
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
                         + m_clusterGroupId + ")(" + 
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
                         + m_serviceGroupId + "))")
                 .setRequired(true);
@@ -366,6 +383,29 @@
         return serviceComponent;
     }
 
+    private boolean isServiceEndpointForServiceGroup(ServiceEndPoint 
serviceEndpoint) {
+        Object serviceGroupProperty = 
serviceEndpoint.getProperties().get(DiscoveryService.REMOTE_SERVICEGROUPID_PROP);
+        if (serviceGroupProperty == null) {
+            return true;
+        }
+        if (serviceGroupProperty instanceof String) {
+            String serviceGroupValue = (String) serviceGroupProperty;
+            if ("".equals(serviceGroupValue) || 
serviceGroupValue.equals(m_serviceGroupId)) {
+                return true;
+            }
+            return false;
+        }
+        if (serviceGroupProperty instanceof String[]) {
+            String[] serviceGroupValues = (String[]) serviceGroupProperty;
+            for (String serviceGroupValue : serviceGroupValues) {
+                if (serviceGroupValue.equals(m_serviceGroupId)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint 
serviceEndpoint) {
         if 
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndpoint.getProperties()))
 {
             System.err.println("No supported configuration type");
@@ -405,7 +445,7 @@
     private void recieveEndpointInvokeMessage(EndpointInvokeMessage 
endpointInvokeMessage) {
 
         ServiceEndPoint serviceEndPoint = 
endpointInvokeMessage.getServiceEndPoint();
-        //FIXME address this
+        // FIXME address this
         serviceEndPoint.setMemberId(null);
         Object serviceResponse = null;
 
@@ -493,7 +533,7 @@
             originMemberId, originServiceGroup, responsePayload));
 
         Event responseEvent =
-            new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" + 
m_clusterGroupId,
+            new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                 eventPayload);
         return responseEvent;
     }
@@ -516,6 +556,7 @@
     }
 
     static class ServiceReferenceComponentTuple {
+
         private final ServiceReference m_serviceReference;
         private final Component m_component;
 

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
        Mon Jan  3 17:18:44 2011
@@ -24,6 +24,7 @@
 
 import org.amdatu.core.fabric.FabricManagerService;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import 
org.amdatu.core.fabric.cluster.service.tribes.TribesClusterMemberServiceImpl;
 import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.DistributionService;
@@ -35,7 +36,7 @@
 import org.osgi.service.cm.ConfigurationException;
 import org.osgi.service.log.LogService;
 
-public class FabricManagerServiceImpl implements FabricManagerService {
+public final class FabricManagerServiceImpl implements FabricManagerService {
 
     private final Map<String, Component> m_clusterMemberComponents = new 
HashMap<String, Component>();
     private final ReentrantReadWriteLock m_clusterMemberComponentsLock = new 
ReentrantReadWriteLock();
@@ -51,13 +52,21 @@
     private volatile LogService m_logService;
 
     /********************************************************
-     * Service lifecycle
+     * Constructor methods
+     ********************************************************/
+
+    public FabricManagerServiceImpl() {
+    }
+
+    /********************************************************
+     * Lifecycle methods
      ********************************************************/
 
     public final synchronized void init() {
         ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
         logServiceDependency.setService(LogService.class);
         logServiceDependency.setRequired(true);
+        logServiceDependency.setInstanceBound(true);
         m_component.add(logServiceDependency);
     }
 
@@ -73,7 +82,7 @@
     }
 
     /********************************************************
-     * ManagedService (controlled by dependencymanager)
+     * ManagedService methods
      ********************************************************/
 
     public synchronized void updated(Dictionary<String, Object> dictionary) 
throws ConfigurationException {
@@ -113,27 +122,33 @@
     }
 
     /********************************************************
-     * FabricManagerService interface
+     * FabricManagerService methods
      ********************************************************/
 
     public boolean createClusterMember(String clusterGroupId, String 
clusterMemberId,
         Dictionary<String, Object> properties) {
-        if (clusterMemberId == null || "".equals(clusterMemberId)) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to create cluster. 
Invalid clustername: " + clusterGroupId);
+            return false;
+        }
+        if (!ClusterMemberUtilities.isValidMemberName(clusterMemberId)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to create cluster. 
Invalid membername: " + clusterMemberId);
             return false;
         }
 
         Dictionary<String, Object> svcProperties = new Hashtable<String, 
Object>();
-        svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP, 
clusterMemberId);
-        svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP, 
clusterMemberId);
+// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
clusterMemberId);
+// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY, 
clusterMemberId);
 
         Component clusterMemberComponent =
-            m_dependencyManager.createComponent()
+            m_dependencyManager
+                .createComponent()
                 .setInterface(ClusterMemberService.class.getName(), properties)
-                .setImplementation(new 
TribesClusterMemberServiceImpl(clusterGroupId, clusterMemberId, properties))
+                .setFactory(new ClusterMemberServiceFactory(clusterGroupId, 
clusterMemberId, properties), "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
-                        .setRequired(true));
+                        .setRequired(true).setInstanceBound(true));
 
         m_clusterMemberComponentsLock.writeLock().lock();
         try {
@@ -177,11 +192,11 @@
                 .createComponent()
                 .setInterface(DiscoveryService.class.getName(),
                     null)
-                .setImplementation(new DiscoveryServiceImpl(clusterGroupId, 
serviceGroupId))
+                .setFactory(new DiscoveryServiceFactory(clusterGroupId, 
serviceGroupId), "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
-                        .setRequired(true));
+                        .setRequired(true).setInstanceBound(true));
 
         m_discoveryComponentsLock.writeLock().lock();
         try {
@@ -222,11 +237,11 @@
             m_dependencyManager
                 .createComponent()
                 .setInterface(DistributionService.class.getName(), null)
-                .setImplementation(new DistributionServiceImpl(clusterGroupId, 
serviceGroupId))
+                .setFactory(new DistributionServiceFactory(clusterGroupId, 
serviceGroupId), "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
-                        .setRequired(true));
+                        .setRequired(true).setInstanceBound(true));
 
         m_distributionComponentsLock.writeLock().lock();
         try {
@@ -245,7 +260,7 @@
     }
 
     public boolean removeDistribution(String clusterGroupId, String 
serviceGroupId) {
-        // FIXME improve checks
+        // TODO improve checks
 
         m_distributionComponentsLock.writeLock().lock();
         try {
@@ -261,7 +276,63 @@
         }
     }
 
+    /********************************************************
+     * Private methods
+     ********************************************************/
+
     private String getKey(String clusterGroupId, String clusterMemberId, 
String serviceGroupId) {
         return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
     }
+
+    /********************************************************
+     * Helper classes
+     ********************************************************/
+
+    static class ClusterMemberServiceFactory {
+
+        private final String m_clusterGroupId;
+        private final String m_clusterMemberId;
+        private final Dictionary<String, Object> m_properties;
+
+        public ClusterMemberServiceFactory(String clusterGroupId, String 
clusterMemberId,
+            Dictionary<String, Object> properties) {
+            m_clusterGroupId = clusterGroupId;
+            m_clusterMemberId = clusterMemberId;
+            m_properties = properties;
+        }
+
+        public ClusterMemberService getInstance() {
+            return new TribesClusterMemberServiceImpl(m_clusterGroupId, 
m_clusterMemberId, m_properties);
+        }
+    }
+
+    static class DiscoveryServiceFactory {
+
+        private final String m_clusterGroupId;
+        private final String m_serviceGroupId;
+
+        public DiscoveryServiceFactory(String clusterGroupId, String 
serviceGroupId) {
+            m_clusterGroupId = clusterGroupId;
+            m_serviceGroupId = serviceGroupId;
+        }
+
+        public DiscoveryService getInstance() {
+            return new DiscoveryServiceImpl(m_clusterGroupId, 
m_serviceGroupId);
+        }
+    }
+
+    static class DistributionServiceFactory {
+
+        private final String m_clusterGroupId;
+        private final String m_serviceGroupId;
+
+        public DistributionServiceFactory(String clusterGroupId, String 
serviceGroupId) {
+            m_clusterGroupId = clusterGroupId;
+            m_serviceGroupId = serviceGroupId;
+        }
+
+        public DistributionService getInstance() {
+            return new DistributionServiceImpl(m_clusterGroupId, 
m_serviceGroupId);
+        }
+    }
 }

Added: 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
       Mon Jan  3 17:18:44 2011
@@ -0,0 +1,43 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.remote.service;
+
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ChannelMemberUtilitiesTest {
+
+    @Test
+    public void testIsValidChannelName() {
+        Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("a"));
+        Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello"));
+        
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_world"));
+        
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello-world"));
+        
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_123"));
+        
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("123hello_"));
+
+        Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(null));
+        Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(""));
+        Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(" "));
+        Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(" 
hello"));
+        Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello 
"));
+        
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello.world"));
+        
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello/world"));
+    }
+
+}

Modified: 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
        Mon Jan  3 17:18:44 2011
@@ -1,3 +1,19 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
 package org.amdatu.core.fabric.remote.service;
 
 import java.util.Dictionary;

Reply via email to