Author: bdekruijff at gmail.com
Date: Tue Jan 11 10:25:35 2011
New Revision: 582

Log:
[sandbox] mcast disabling. support / static member support  / member events / 
more routing / robustness and fixes

Added:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
Removed:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/IntrospectionUtils.java
Modified:
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
   
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
    (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
    Tue Jan 11 10:25:35 2011
@@ -6,9 +6,9 @@
 
     String CONFIGURATION_PID = "org.amdatu.core.fabric";
 
-    boolean createClusterMember(String clusterGroupId, String clusterMemberId, 
Dictionary<String, Object> properties);
+    boolean createClusterChannel(String clusterGroupId, Dictionary<String, 
Object> properties);
 
-    boolean removeClusterMember(String clusterGroupId, String clusterMemberId);
+    boolean removeClusterChannel(String clusterGroupId);
 
     boolean createDiscovery(String clusterGroupId, String serviceGroupId);
 

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
        Tue Jan 11 10:25:35 2011
@@ -0,0 +1,22 @@
+package org.amdatu.core.fabric.cluster;
+
+public class MemberAddedEvent {
+
+    private final String m_memberId;
+
+    public MemberAddedEvent(final String memberId) {
+        m_memberId = memberId;
+    }
+
+    public String getMemberId() {
+        return m_memberId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(MemberAddedEvent.class.getSimpleName() + "{");
+        sb.append("\n\tmemberId: " + m_memberId);
+        sb.append("\n}");
+        return sb.toString();
+    }
+}

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
      Tue Jan 11 10:25:35 2011
@@ -0,0 +1,22 @@
+package org.amdatu.core.fabric.cluster;
+
+public class MemberRemovedEvent {
+
+    private final String m_memberId;
+
+    public MemberRemovedEvent(final String memberId) {
+        m_memberId = memberId;
+    }
+
+    public String getMemberId() {
+        return m_memberId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(MemberRemovedEvent.class.getSimpleName() + "{");
+        sb.append("\n\tmemberId: " + m_memberId);
+        sb.append("\n}");
+        return sb.toString();
+    }
+}

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
 Tue Jan 11 10:25:35 2011
@@ -97,4 +97,17 @@
     public final void setTargetServiceGroup(String serviceGroup) {
         m_targetServiceGroup = serviceGroup;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("RoutableMessage{");
+        sb.append("\n\toriginClusterId=" + m_originClusterId);
+        sb.append("\n\toriginMemberId=" + m_originMemberId);
+        sb.append("\n\toriginServiceGroup=" + m_originServiceGroup);
+        sb.append("\n\ttargetClusterId=" + m_targetClusterId);
+        sb.append("\n\ttargetMemberId=" + m_targetMemberId);
+        sb.append("\n\ttargetServiceGroup=" + m_targetServiceGroup);
+        sb.append("\n}");
+        return sb.toString();
+    }
 }
\ No newline at end of file

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
  Tue Jan 11 10:25:35 2011
@@ -0,0 +1,220 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.transport.bio.PooledMultiSender;
+import org.apache.catalina.tribes.transport.nio.NioReceiver;
+
+/**
+ * Build a Apache Tribes based cluster channel specific to Amdatu clusters.
+ * 
+ * @see 
http://svn.apache.org/repos/asf/tomcat/tc7.0.x/tags/TOMCAT_7_0_5/test/org/apache/catalina/tribes/demos/ChannelCreator.java
+ * 
+ */
+public class ChannelBuilder {
+
+    /**
+     * Type of membership discovery. Default is "mcast" for dynamic discovery 
and
+     * optional additional static members. To disable multicast use "static".
+     */
+    public static final String PROP_MEMBERSHIP = "membership";
+    public static final String PROP_MEMBERSHIP_DEFAULT = "mcast";
+
+    /**
+     * Comma separated list of static members that must be represented in the 
form
+     * hostname:port.
+     */
+    public static final String PROP_STATICMEMBERS = "static";
+    public static final String PROP_STATICMEMBERS_DEFAULT = "";
+
+    /**
+     * Receiver component hostname. Default is "auto".
+     */
+    public static final String PROP_TCPBIND = "tcpbind";
+    public static final String PROP_TCPBIND_DEFAULT = "auto";
+
+    /**
+     * Receiver component port number. Default is "4001".
+     */
+    public static final String PROP_TCPPORT = "tcpport";
+    public static final int PROP_TCPPORT_DEFAULT = 4001;
+
+    /**
+     * Receiver component selector timeout in milliseconds for inactive
+     * connections. Default is "5000".
+     */
+    public static final String PROP_TCPSELTO = "tcpselo";
+    public static final int PROP_TCPSELTO_DEFAULT = 5000;
+
+    /**
+     * Receiver component number of threads. Default is "4".
+     */
+    public static final String PROP_TCPTHREADS = "tcpthreads";
+    public static final int PROP_TCPTHREADS_DEFAULT = 4;
+
+    /**
+     * Sender component handshake timeout in milliseconds for inactive 
connections.
+     * Default is "500".
+     */
+    public static final String PROP_TCKACKTO = "tcpackto";
+    public static final int PROP_TCPACKTO_DEFAULT = 500;
+
+    /**
+     * Membership component multicast address (class D). Default is 
"228.0.0.5".
+     */
+    public static final String PROP_MCASTADDR = "maddr";
+    public static final String PROP_MCASTADDR_DEFAULT = "228.0.0.5";
+
+    /**
+     * Membership component multicast port. Default value is "45565".
+     */
+    public static final String PROP_MCASTPORT = "mport";
+    public static final int PROP_MCASTPORT_DEFAULT = 45565;
+
+    /**
+     * Membership component hostname. Default value is "".
+     */
+    public static final String PROP_MCASTBIND = "mbind";
+    public static final String PROP_MCASTBIND_DEFAULT = "";
+
+    /**
+     * Membership component frequency of discovery requests in milliseconds.
+     * Default value is "500".
+     */
+    public static final String PROP_MCASTFREQ = "freq";
+    public static final int PROP_MCASTFREQ_DEFAULT = 500;
+
+    /**
+     * Membership component timeout for discovery requests in milliseconds.
+     * Default value is "2000".
+     */
+    public static final String PROP_MCASTDROP = "mdrop";
+    public static final int PROP_MCASTDROP_DEFAULT = 2000;
+
+    private static final String DOMAIN_ENCODING = "ISO-8859-1";
+
+    public static Channel createChannel(String channelName, Dictionary<String, 
Object> options) throws Exception {
+
+        byte[] domain = channelName.getBytes(DOMAIN_ENCODING);
+
+        NioReceiver rx = new NioReceiver();
+        rx.setAddress(checkStringProperty(options.get(PROP_TCPBIND), 
PROP_TCPBIND_DEFAULT));
+        rx.setPort(checkIntegerProperty(options.get(PROP_TCPPORT), 
PROP_TCPPORT_DEFAULT));
+        rx.setSelectorTimeout(checkIntegerProperty(options.get(PROP_TCPSELTO), 
PROP_TCPSELTO_DEFAULT));
+        rx.setMaxThreads(checkIntegerProperty(options.get(PROP_TCPTHREADS), 
PROP_TCPTHREADS_DEFAULT));
+        rx.setMinThreads(checkIntegerProperty(options.get(PROP_TCPTHREADS), 
PROP_TCPTHREADS_DEFAULT));
+        rx.getBind();
+        rx.setRxBufSize(43800);
+        rx.setTxBufSize(25188);
+        rx.setAutoBind(0);
+
+        ReplicationTransmitter tx = new ReplicationTransmitter();
+        PooledMultiSender sender = new PooledMultiSender();
+        sender.setTimeout(checkIntegerProperty(options.get(PROP_TCKACKTO), 
PROP_TCPACKTO_DEFAULT));
+        sender.setMaxRetryAttempts(2);
+        sender.setRxBufSize(43800);
+        sender.setTxBufSize(25188);
+        tx.setTransport(sender);
+
+        String membership = checkStringProperty(options.get(PROP_MEMBERSHIP), 
PROP_MEMBERSHIP_DEFAULT);
+        MembershipService ms;
+        if (membership.equals("static")) {
+            ms = new NoMcastMembershipServiceImpl();
+            ms.setDomain(domain);
+        }
+        else {
+            McastService mc = new McastService();
+            mc.setAddress(checkStringProperty(options.get(PROP_MCASTADDR), 
PROP_MCASTADDR_DEFAULT));
+            if (checkStringProperty(options.get(PROP_MCASTBIND), 
PROP_MCASTBIND_DEFAULT).equals(""))
+                
mc.setMcastBindAddress(checkStringProperty(options.get(PROP_MCASTBIND), 
PROP_MCASTBIND_DEFAULT));
+            mc.setFrequency(checkIntegerProperty(options.get(PROP_MCASTFREQ), 
PROP_MCASTFREQ_DEFAULT));
+            
mc.setMcastDropTime(checkIntegerProperty(options.get(PROP_MCASTDROP), 
PROP_MCASTDROP_DEFAULT));
+            mc.setPort(checkIntegerProperty(options.get(PROP_MCASTPORT), 
PROP_MCASTPORT_DEFAULT));
+            mc.setDomain(domain);
+            ms = mc;
+        }
+
+        ManagedChannel channel = new GroupChannel();
+        channel.setChannelReceiver(rx);
+        channel.setChannelSender(tx);
+        channel.setMembershipService(ms);
+
+        Member[] staticMembers =
+            
memberArrayFromString(checkStringProperty(options.get(PROP_STATICMEMBERS), 
PROP_STATICMEMBERS_DEFAULT),
+                domain);
+        if (staticMembers != null && staticMembers.length > 0) {
+            StaticMemberMonitorInterceptor smi = new 
StaticMemberMonitorInterceptor();
+            for (int i = 0; i < staticMembers.length; i++) {
+                smi.addStaticMember(staticMembers[i]);
+            }
+            channel.addInterceptor(smi);
+        }
+        return channel;
+    }
+
+    private static String checkStringProperty(Object property, String 
defaultValue) {
+        if (property == null || !(property instanceof String) || 
property.equals(""))
+            return defaultValue;
+        return (String) property;
+    }
+
+    private static int checkIntegerProperty(Object property, int defaultValue) 
{
+        if (property != null) {
+            if (property instanceof String) {
+                try {
+                    return Integer.parseInt((String) property);
+                }
+                catch (NumberFormatException e) {}
+            }
+            else {
+                if (property instanceof Integer) {
+                    return ((Integer) property).intValue();
+
+                }
+            }
+        }
+        return defaultValue;
+    }
+
+    private static Member[] memberArrayFromString(String property, byte[] 
domain) throws IOException {
+        Member[] members = null;
+        if (property != null && !property.equals("")) {
+            String[] memberparts = property.split(",");
+            members = new Member[memberparts.length];
+            for (int i = 0; i < memberparts.length; i++) {
+                String[] hostporttuples = memberparts[i].split(":");
+                MemberImpl memberimpl = new MemberImpl();
+                memberimpl.setHostname(hostporttuples[0]);
+                memberimpl.setPort(Integer.parseInt(hostporttuples[1]));
+                memberimpl.setDomain(domain);
+                members[i] = memberimpl;
+            }
+        }
+        return members;
+    }
+}
\ No newline at end of file

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
    Tue Jan 11 10:25:35 2011
@@ -0,0 +1,163 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+/**
+ * Alternative to MCastService doing next to nothing for static
+ * membership only scenario. This actually does nothing besides
+ * managing the local member.
+ * 
+ * @see StaticMemberMonitorInterceptor
+ * 
+ */
+public class NoMcastMembershipServiceImpl implements MembershipService {
+
+    protected static final Member[] EMPTY_MEMBERS = new Member[0];
+    protected static final String[] EMPTY_MEMBERNAMES = new String[0];
+
+    protected MembershipListener m_membershipListener;
+
+    protected long m_serviceStartTime;
+
+    protected Properties m_properties = new Properties();
+
+    protected MemberImpl m_localMember;
+    protected Member[] m_members;
+
+    protected byte[] m_payload;
+
+    protected byte[] m_domain;
+
+    public Member findMemberByName(String name) {
+        return null;
+    }
+
+    public byte[] getPayload() {
+        return m_payload;
+    }
+
+    public byte[] getDomain() {
+        return m_domain;
+    }
+
+    public Member getLocalMember(boolean alive) {
+        if (alive && m_localMember != null) {
+            m_localMember.setMemberAliveTime(System.currentTimeMillis() - 
m_serviceStartTime);
+        }
+        return m_localMember;
+    }
+
+    public Member getMember(Member member) {
+        return null;
+    }
+
+    public Member[] getMembers() {
+        return EMPTY_MEMBERS;
+    }
+
+    public String[] getMembersByName() {
+        return EMPTY_MEMBERNAMES;
+    }
+
+    public Properties getProperties() {
+        return m_properties;
+    }
+
+    public boolean hasMembers() {
+        return false;
+    }
+
+    public void removeMembershipListener() {
+        m_membershipListener = null;
+    }
+
+    public void setDomain(byte[] domain) {
+        m_domain = domain;
+    }
+
+    public void setLocalMemberProperties(String listenHost, int listenPort) {
+        m_properties.setProperty("tcpListenHost", listenHost);
+        m_properties.setProperty("tcpListenPort", String.valueOf(listenPort));
+        try {
+            if (m_localMember != null) {
+                m_localMember.setHostname(listenHost);
+                m_localMember.setPort(listenPort);
+            }
+            else {
+                m_localMember = new MemberImpl(listenHost, listenPort, 0);
+                m_localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+                m_localMember.setPayload(getPayload());
+                m_localMember.setDomain(getDomain());
+            }
+            m_localMember.getData(true, true);
+        }
+        catch (IOException x) {
+            throw new IllegalArgumentException(x);
+        }
+    }
+
+    public void setMembershipListener(MembershipListener membershipListener) {
+        m_membershipListener = membershipListener;
+    }
+
+    public void setPayload(byte[] payload) {
+        m_payload = payload;
+    }
+
+    public void setProperties(Properties properties) {
+        this.m_properties = properties;
+    }
+
+    public void start() throws Exception {
+        start(0);
+    }
+
+    public void start(int arg0) throws Exception {
+        String host = getProperties().getProperty("tcpListenHost");
+        int port = 
Integer.parseInt(getProperties().getProperty("tcpListenPort"));
+        if (m_localMember == null) {
+            m_localMember = new MemberImpl(host, port, 100);
+            m_localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+        }
+        else {
+            m_localMember.setHostname(host);
+            m_localMember.setPort(port);
+            m_localMember.setMemberAliveTime(100);
+        }
+        m_serviceStartTime = System.currentTimeMillis();
+    }
+
+    public void stop(int arg0) {
+    }
+
+    public void memberAdded(Member member) {
+        m_membershipListener.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        m_membershipListener.memberDisappeared(member);
+    }
+}

Added: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
==============================================================================
--- (empty file)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
  Tue Jan 11 10:25:35 2011
@@ -0,0 +1,462 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.Arrays;
+
+/**
+ * Tribes interceptor that actively monitors the availability and state of 
configured static
+ * cluster members. This is NOT implemented as a MembershipService so that it 
can operate
+ * in conjunction with the MCastMebershipService.
+ * 
+ * TODO bridge to LogService
+ * TODO make timeouts configurable
+ */
+public final class StaticMemberMonitorInterceptor extends 
ChannelInterceptorBase {
+
+    public static final byte[] TCP_HEARTBEAT = new byte[] {
+        85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, 
-64, -43 };
+
+    public static final byte[] STARTUP_PAYLOAD = new byte[] { 67, 65, 66, 89, 
45, 65, 76, 69, 88 };
+    public static final byte[] SHUTDOWN_PAYLOAD = new byte[] { 66, 65, 66, 89, 
45, 65, 76, 69, 88 };
+
+    private static final long ALIVE_HEARTBEAT_INTERVAL = 5000;
+    private static final long ALIVE_TIMEOUT_INTERVAL = 10000;
+
+    private final ConcurrentHashMap<StaticMemberKey, StaticMemberWrapper> 
m_configuredMembers =
+        new ConcurrentHashMap<StaticMemberKey, StaticMemberWrapper>();
+
+    private final LinkedBlockingQueue<Member> m_remoteHeartbeats = new 
LinkedBlockingQueue<Member>();
+
+    private Thread m_stateMonitor;
+    private volatile boolean m_runStateMonitor;
+
+    public StaticMemberMonitorInterceptor() {
+        super();
+    }
+
+    public void addStaticMember(Member member) {
+        if (member == null)
+            return;
+        StaticMemberKey key = StaticMemberKey.get(member);
+        m_configuredMembers.putIfAbsent(key, new StaticMemberWrapper(key, 
member));
+    }
+
+    public void removeStaticMember(Member member) {
+        if (member == null)
+            return;
+        StaticMemberKey key = StaticMemberKey.get(member);
+        StaticMemberWrapper wrapper = m_configuredMembers.remove(key);
+        if (wrapper.isActive()) {
+            memberDisappeared(wrapper.getRemoteMember());
+        }
+    }
+
+    @Override
+    public void start(int svc) throws ChannelException {
+        for (Entry<StaticMemberKey, StaticMemberWrapper> entry : 
m_configuredMembers.entrySet()) {
+            StaticMemberWrapper wrapper = entry.getValue();
+            sendHeartbeatMessage(wrapper.getConfiguredMember(), 
STARTUP_PAYLOAD);
+        }
+        startStateMonitor();
+        super.start(svc);
+    }
+
+    @Override
+    public void stop(int svc) throws ChannelException {
+        stopStateMonitor();
+        for (Entry<StaticMemberKey, StaticMemberWrapper> entry : 
m_configuredMembers.entrySet()) {
+            StaticMemberWrapper wrapper = entry.getValue();
+            sendHeartbeatMessage(wrapper.getConfiguredMember(), 
SHUTDOWN_PAYLOAD);
+        }
+        super.stop(svc);
+    }
+
+    @Override
+    public synchronized void heartbeat() {
+        super.heartbeat();
+        if (m_runStateMonitor)
+            updateConfiguredMemberStates();
+    }
+
+    @Override
+    public void messageReceived(ChannelMessage msg) {
+        if (isHeartBeatMessage(msg.getMessage().getBytes())) {
+
+            try {
+                Member remoteMember = 
MemberImpl.getMember(msg.getMessage().getBytes(), TCP_HEARTBEAT.length,
+                    msg.getMessage().getLength() - TCP_HEARTBEAT.length);
+                m_remoteHeartbeats.offer(remoteMember);
+                return;
+            }
+            catch (IllegalArgumentException e) {
+                // Thrown by MemberImpl if something is wrong with the bytes. 
It probably indicates that our
+                // HEARTBEAT prefix is not unique enough and needs attention. 
We pass it on as a regular
+                // message as a fallback.
+                e.printStackTrace();
+            }
+        }
+        super.messageReceived(msg);
+    }
+
+    @Override
+    public void memberAdded(Member member) {
+        System.err.println("Added: " + member.toString());
+        super.memberAdded(member);
+    }
+
+    @Override
+    public void memberDisappeared(Member member) {
+        System.err.println("Removed: " + member.toString());
+        super.memberDisappeared(member);
+    }
+
+    private void startStateMonitor() {
+        m_runStateMonitor = true;
+        m_stateMonitor = new Thread(new Runnable() {
+            public void run() {
+                while (m_runStateMonitor) {
+                    processHeartbeats();
+                }
+            }
+        });
+        m_stateMonitor.setDaemon(true);
+        m_stateMonitor.setPriority(Thread.MAX_PRIORITY);
+        m_stateMonitor.start();
+    }
+
+    private void stopStateMonitor() {
+        m_runStateMonitor = false;
+    }
+
+    private void processHeartbeats() {
+        try {
+            Member remoteMember = m_remoteHeartbeats.take();
+            long currentTimeMillis = System.currentTimeMillis();
+            StaticMemberKey key = StaticMemberKey.get(remoteMember);
+            StaticMemberWrapper wrapper = m_configuredMembers.get(key);
+            if (wrapper == null) {
+                return;
+            }
+            processHeartbeat(wrapper, remoteMember, currentTimeMillis);
+        }
+        catch (InterruptedException e) {}
+    }
+
+    private void processHeartbeat(StaticMemberWrapper wrapper, Member 
remoteMember, long currentTimeMillis) {
+        if (!Arrays.equals(getLocalMember(false).getDomain(), 
remoteMember.getDomain())) {
+            // member from other domain
+            return;
+        }
+        if (Arrays.equals(remoteMember.getCommand(), STARTUP_PAYLOAD)) {
+            // member startup
+            wrapper.reset();
+            wrapper.setRemoteMember(remoteMember);
+            wrapper.updateLastHeartbeat(currentTimeMillis);
+            if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                wrapper.updateLastReached(currentTimeMillis);
+                wrapper.setActive(true);
+                memberAdded(remoteMember);
+            }
+            return;
+        }
+        if (Arrays.equals(remoteMember.getCommand(), SHUTDOWN_PAYLOAD)) {
+            // member shutdown
+            if (wrapper.isActive()) {
+                wrapper.reset();
+                memberDisappeared(remoteMember);
+            }
+            return;
+        }
+        if (wrapper.getRemoteMember() == null) {
+            // member appears
+            wrapper.reset();
+            wrapper.setRemoteMember(remoteMember);
+            wrapper.updateLastHeartbeat(currentTimeMillis);
+            if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                wrapper.updateLastReached(currentTimeMillis);
+                wrapper.setActive(true);
+                memberAdded(remoteMember);
+            }
+            return;
+        }
+        if (!Arrays.equals(wrapper.getRemoteMember().getUniqueId(), 
remoteMember.getUniqueId())) {
+            // member changes
+            memberDisappeared(remoteMember);
+            wrapper.reset();
+            wrapper.setRemoteMember(remoteMember);
+            wrapper.updateLastHeartbeat(currentTimeMillis);
+            if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                wrapper.updateLastReached(currentTimeMillis);
+                wrapper.setActive(true);
+                memberAdded(remoteMember);
+            }
+            return;
+        }
+        if (!wrapper.isActive()) {
+            // member inactive
+            wrapper.updateLastHeartbeat(currentTimeMillis);
+            if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                wrapper.updateLastReached(currentTimeMillis);
+                wrapper.setActive(true);
+                memberAdded(remoteMember);
+            }
+            return;
+        }
+        // all is well
+        wrapper.updateLastHeartbeat(currentTimeMillis);
+    }
+
+    private void updateConfiguredMemberStates() {
+        long currentTimeMillis = System.currentTimeMillis();
+        for (Entry<StaticMemberKey, StaticMemberWrapper> entry : 
m_configuredMembers.entrySet()) {
+            StaticMemberWrapper wrapper = entry.getValue();
+            updateConfiguredMemberState(currentTimeMillis, wrapper);
+        }
+    }
+
+    private void updateConfiguredMemberState(long currentTimeMillis, 
StaticMemberWrapper wrapper) {
+        if (wrapper.isActive()) {
+            // should not happen
+            if (wrapper.getRemoteMember() == null) {
+                wrapper.reset();
+                return;
+            }
+            // member silent
+            if ((currentTimeMillis - wrapper.getLastHeartbeat()) > 
ALIVE_TIMEOUT_INTERVAL) {
+                memberDisappeared(wrapper.getRemoteMember());
+                wrapper.reset();
+                return;
+            }
+            // member stale
+            if ((currentTimeMillis - wrapper.getLastReached()) > 
ALIVE_HEARTBEAT_INTERVAL) {
+                if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                    wrapper.updateLastReached(currentTimeMillis);
+                }
+                else {
+                    memberDisappeared(wrapper.getRemoteMember());
+                    wrapper.reset();
+                }
+            }
+            return;
+        }
+        // wrapper is inactive
+        if ((currentTimeMillis - wrapper.getLastReached()) > 
ALIVE_HEARTBEAT_INTERVAL) {
+            if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+                wrapper.updateLastReached(currentTimeMillis);
+            }
+        }
+        // member revived
+        if ((currentTimeMillis - wrapper.getLastHeartbeat()) < 
ALIVE_TIMEOUT_INTERVAL
+                        && (currentTimeMillis - wrapper.getLastReached() < 
ALIVE_HEARTBEAT_INTERVAL)) {
+            wrapper.setActive(true);
+            memberAdded(wrapper.getRemoteMember());
+        }
+    }
+
+    private boolean sendHeartbeatMessage(Member member) {
+        return sendHeartbeatMessage(member, null);
+    }
+
+    private boolean sendHeartbeatMessage(Member member, byte[] command) {
+        Socket socket = new Socket();
+        try {
+            InetAddress ia = InetAddress.getByAddress(member.getHost());
+            InetSocketAddress addr = new InetSocketAddress(ia, 
member.getPort());
+            socket.setSoTimeout((int) 100);
+            socket.connect(addr, (int) 100);
+            ChannelData data = new ChannelData(true);
+            data.setAddress(member);
+
+            XByteBuffer mbrBuf = new XByteBuffer(TCP_HEARTBEAT, false);
+            MemberImpl localMember = (MemberImpl) getLocalMember(false);
+            localMember.setCommand(command);
+            byte[] mbrData = localMember.getData();
+            mbrBuf.append(mbrData, 0, mbrData.length);
+            data.setMessage(mbrBuf);
+            data.setTimestamp(System.currentTimeMillis());
+            int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+            options = (options | Channel.SEND_OPTIONS_USE_ACK);
+            data.setOptions(options);
+            byte[] message = XByteBuffer.createDataPackage(data);
+            socket.getOutputStream().write(message);
+            int length = socket.getInputStream().read(message);
+            return length > 0;
+        }
+        catch (SocketTimeoutException sx) {}
+        catch (ConnectException cx) {}
+        catch (Exception x) {}
+        finally {
+            try {
+                socket.close();
+
+            }
+            catch (Exception ignore) {}
+        }
+        return false;
+    }
+
+    private boolean isHeartBeatMessage(byte[] bytes) {
+        if (bytes == null || bytes.length < TCP_HEARTBEAT.length)
+            return false;
+        for (int i = 0; i < TCP_HEARTBEAT.length; i++) {
+            if (bytes[i] != TCP_HEARTBEAT[i])
+                return false;
+        }
+        return true;
+    }
+
+    static class StaticMemberWrapper {
+
+        private final StaticMemberKey m_key;
+        private final Member m_configuredMember;
+
+        private Member m_remoteMember;
+        private Member m_updateMember;
+
+        private boolean m_active = false;
+
+        private long m_lastHeartbeat = 0;
+        private long m_lastReached = 0;
+
+        public StaticMemberWrapper(StaticMemberKey key, Member 
configuredMember) {
+            m_key = key;
+            m_configuredMember = configuredMember;
+        }
+
+        public StaticMemberKey getKey() {
+            return m_key;
+        }
+
+        public Member getConfiguredMember() {
+            return m_configuredMember;
+        }
+
+        public Member getRemoteMember() {
+            return m_remoteMember;
+        }
+
+        public void setRemoteMember(Member remoteMember) {
+            m_remoteMember = remoteMember;
+        }
+
+        public Member getUpdateMember() {
+            return m_updateMember;
+        }
+
+        public void setUpdateMember(Member updateMember) {
+            m_updateMember = updateMember;
+        }
+
+        public void updateLastHeartbeat(long now) {
+            m_lastHeartbeat = now;
+        }
+
+        public long getLastHeartbeat() {
+            return m_lastHeartbeat;
+        }
+
+        public void updateLastReached(long now) {
+            m_lastReached = now;
+        }
+
+        public long getLastReached() {
+            return m_lastReached;
+        }
+
+        public boolean isActive() {
+            return m_active;
+        }
+
+        public void setActive(boolean state) {
+            m_active = state;
+        }
+
+        public void reset() {
+            m_active = false;
+            m_remoteMember = null;
+            m_updateMember = null;
+            m_lastHeartbeat = 0;
+            m_lastReached = 0;
+        }
+    }
+
+    static class StaticMemberKey {
+
+        private final static Map<Member, StaticMemberKey> KEY_CACHE = new 
HashMap<Member, StaticMemberKey>();
+
+        public static StaticMemberKey get(Member member) {
+            StaticMemberKey key = KEY_CACHE.get(member);
+            if (key == null) {
+                key = new StaticMemberKey(member);
+                KEY_CACHE.put(member, key);
+            }
+            return key;
+        }
+
+        private final byte[] m_host;
+        private final int m_port;
+
+        public StaticMemberKey(Member member) {
+            m_host = member.getHost();
+            m_port = member.getPort();
+        }
+
+        public StaticMemberKey(byte[] hostname, int port) {
+            m_host = hostname;
+            m_port = port;
+        }
+
+        public byte[] getHostName() {
+            return m_host;
+        }
+
+        public int getPort() {
+            return m_port;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return ((StaticMemberKey) obj).getPort() == m_port
+                && Arrays.equals(((StaticMemberKey) obj).getHostName(), 
m_host);
+        }
+
+        @Override
+        public int hashCode() {
+            return m_host[0] + m_host[1] + m_host[2] + m_host[3] + m_port;
+        }
+    }
+}
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
        Tue Jan 11 10:25:35 2011
@@ -21,13 +21,17 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.MemberAddedEvent;
+import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.remote.internal.EndpointInvokeMessage;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
 import org.apache.felix.dm.ServiceDependency;
@@ -39,6 +43,8 @@
 
 /**
  * I manage cluster state
+ * 
+ * TODO add a static(?) getConfigurationProperties method
  */
 public abstract class ClusterMemberServiceBase implements ClusterMemberService 
{
 
@@ -46,28 +52,26 @@
     private final ReentrantReadWriteLock m_clusterMembersLock = new 
ReentrantReadWriteLock();
 
     private final String m_clusterId;
-    private final String m_memberId;
     private final Dictionary<String, Object> m_properties;
 
     private final String m_recieveEventTopic;
     private final String m_sendEventTopic;
 
-    // injected
     private volatile DependencyManager m_dependencyManager;
     private volatile Component m_component;
     private volatile EventAdmin m_eventAdmin;
     private volatile LogService m_logService;
 
     private volatile Component m_broadcastEventHandlerComponent;
+    private volatile String m_memberId;
 
     /********************************************************
      * Constructors
      ********************************************************/
 
-    public ClusterMemberServiceBase(String clusterGroupId, String 
clusterMemberId,
-        Dictionary<String, Object> properties) {
+    public ClusterMemberServiceBase(String clusterGroupId, Dictionary<String, 
Object> properties) {
         m_clusterId = clusterGroupId;
-        m_memberId = clusterMemberId;
+        m_memberId = UUID.randomUUID().toString();
         m_properties = new Hashtable<String, Object>();
         if (properties != null) {
             Enumeration<String> enumeration = properties.keys();
@@ -85,35 +89,10 @@
      ********************************************************/
 
     public final synchronized void init() {
-
-        @SuppressWarnings("unchecked")
-        Dictionary<String, Object> serviceProps = 
m_component.getServiceProperties();
-        if (serviceProps == null) {
-            serviceProps = new Hashtable<String, Object>();
-        }
-        
-        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterId);
-        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY, 
m_memberId);
-        serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY, 
m_properties);
-        m_component.setServiceProperties(serviceProps);
-
-        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
-        logServiceDependency.setService(LogService.class);
-        logServiceDependency.setRequired(true);
-        logServiceDependency.setInstanceBound(true);
-        m_component.add(logServiceDependency);
-
-        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
-        eventAdminServiceDependency.setService(EventAdmin.class);
-        eventAdminServiceDependency.setRequired(true);
-        eventAdminServiceDependency.setInstanceBound(true);
-        m_component.add(eventAdminServiceDependency);
-
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic 
});
-        m_broadcastEventHandlerComponent = 
m_dependencyManager.createComponent();
-        
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(), 
props);
-        m_broadcastEventHandlerComponent.setImplementation(new 
BroadcastEventHandler());
+        updateServiceProperties();
+        initLogServiceDependency();
+        initEventAdminDependency();
+        initBroadcastEventHandlerComponent();
         onInit();
     }
 
@@ -122,15 +101,15 @@
     }
 
     public final synchronized void start() {
-        m_logService.log(LogService.LOG_WARNING, "Starting 
ClusterMemberService");
         m_dependencyManager.add(m_broadcastEventHandlerComponent);
         onStart();
+        m_logService.log(LogService.LOG_WARNING, "Started ClusterChannel: " + 
m_clusterId + "/" + m_memberId);
     }
 
     public final synchronized void stop() {
-        m_logService.log(LogService.LOG_WARNING, "Stopping 
ClusterMemberService");
         m_dependencyManager.remove(m_broadcastEventHandlerComponent);
         onStop();
+        m_logService.log(LogService.LOG_WARNING, "Stopped ClusterChannel: " + 
m_clusterId + "/" + m_memberId);
     }
 
     /********************************************************
@@ -149,6 +128,11 @@
         return m_memberId;
     }
 
+    protected final void setMemberId(String memberId) {
+        m_memberId = memberId;
+        updateServiceProperties();
+    }
+
     protected final Dictionary<String, Object> getProperties() {
         return m_properties;
 
@@ -182,6 +166,10 @@
         finally {
             m_clusterMembersLock.writeLock().unlock();
         }
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EVENT_MESSAGE_PROPERTY, new 
MemberAddedEvent(clusterMember.getMemberId()));
+        Event broadCastEvent = new Event(m_recieveEventTopic, props);
+        m_eventAdmin.postEvent(broadCastEvent);
     }
 
     protected final void removeClusterMember(String memberId) {
@@ -192,9 +180,13 @@
         finally {
             m_clusterMembersLock.writeLock().unlock();
         }
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EVENT_MESSAGE_PROPERTY, new MemberRemovedEvent(memberId));
+        Event broadCastEvent = new Event(m_recieveEventTopic, props);
+        m_eventAdmin.postEvent(broadCastEvent);
     }
 
-    protected final void dispatchMessage(Object message) {
+    protected final void dispatchMessage(Object message, ClusterMember sender) 
{
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(EVENT_MESSAGE_PROPERTY, message);
         if (message instanceof LocalTopicMessage) {
@@ -224,46 +216,72 @@
     protected abstract void doSend(ClusterMember[] clusterMember, Object 
message);
 
     /********************************************************
-     * Helper classes
+     * Private methods
      ********************************************************/
 
-    class BroadcastEventHandler implements EventHandler {
+    private void updateServiceProperties() {
+        @SuppressWarnings("unchecked")
+        Dictionary<String, Object> serviceProps = 
m_component.getServiceProperties();
+        if (serviceProps == null)
+            serviceProps = new Hashtable<String, Object>();
+        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
m_clusterId);
+        serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY, 
m_memberId);
+        serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY, 
m_properties);
+        m_component.setServiceProperties(serviceProps);
+    }
 
-        public void handleEvent(Event event) {
-            Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
-            if (message instanceof RoutableMessage) {
+    private void initBroadcastEventHandlerComponent() {
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic 
});
+        m_broadcastEventHandlerComponent = 
m_dependencyManager.createComponent();
+        
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(), 
props);
+        m_broadcastEventHandlerComponent.setImplementation(new 
BroadcastEventHandler());
+    }
+
+    private void initEventAdminDependency() {
+        ServiceDependency eventAdminServiceDependency = 
m_dependencyManager.createServiceDependency();
+        eventAdminServiceDependency.setService(EventAdmin.class);
+        eventAdminServiceDependency.setRequired(true);
+        eventAdminServiceDependency.setInstanceBound(true);
+        m_component.add(eventAdminServiceDependency);
+    }
 
-                RoutableMessage routableMessage = (RoutableMessage) message;
-                routableMessage.setOriginClusterId(m_clusterId);
-                routableMessage.setOriginMemberId(m_memberId);
+    private void initLogServiceDependency() {
+        ServiceDependency logServiceDependency = 
m_dependencyManager.createServiceDependency();
+        logServiceDependency.setService(LogService.class);
+        logServiceDependency.setRequired(true);
+        logServiceDependency.setInstanceBound(true);
+        m_component.add(logServiceDependency);
+    }
 
-                if (routableMessage.getTargetClusterId() == null) {
-                    routableMessage.setTargetClusterId(m_clusterId);
-                }
-                else {
-                    // FIXME address this
-                    if 
(!routableMessage.getTargetClusterId().equals(getClusterId())) {
-                        m_logService.log(LogService.LOG_ERROR, 
RoutableMessage.class.getSimpleName()
-                                + " is not for this cluster: " + 
routableMessage.getTargetClusterId());
-                    }
-                }
+    /********************************************************
+     * Helper classes
+     ********************************************************/
 
-                ClusterMember clusterMember = null;
-                if (routableMessage.getTargetMemberId() != null) {
-                    clusterMember = 
getClusterMember(routableMessage.getTargetMemberId());
-                    if (clusterMember == null) {
-                        // FIXME address this
-                        m_logService.log(LogService.LOG_ERROR, "RoutedMessage 
specifies unknown target member: "
-                            + routableMessage.getTargetMemberId());
-                    }
-                }
+    class BroadcastEventHandler implements EventHandler {
 
-                if (clusterMember != null) {
-                    doSend(new ClusterMember[] { clusterMember }, message);
+        public void handleEvent(Event event) {
+            Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
+            if (!(message instanceof RoutableMessage)) {
+                doBroadcast(message);
+                return;
+            }
+            RoutableMessage routableMessage = (RoutableMessage) message;
+            routableMessage.setOriginClusterId(m_clusterId);
+            routableMessage.setOriginMemberId(m_memberId);
+            routableMessage.setTargetClusterId(m_clusterId);
+            if (routableMessage.getTargetMemberId() != null) {
+                ClusterMember clusterMember = 
getClusterMember(routableMessage.getTargetMemberId());
+                if (clusterMember == null) {
+                    m_logService.log(LogService.LOG_ERROR, "RoutedMessage 
specifies unknown target member: "
+                            + routableMessage.toString());
                     return;
                 }
+                doSend(new ClusterMember[] { clusterMember }, message);
+            }
+            else {
+                doBroadcast(message);
             }
-            doBroadcast(message);
         }
     }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
   Tue Jan 11 10:25:35 2011
@@ -16,24 +16,20 @@
  */
 package org.amdatu.core.fabric.cluster.service.tribes;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMember;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
-import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
+import org.amdatu.core.fabric.cluster.internal.tribes.ChannelBuilder;
 import org.amdatu.core.fabric.cluster.service.ClusterMemberServiceBase;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
@@ -41,17 +37,16 @@
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.util.Arrays;
 import org.osgi.service.log.LogService;
 
 public final class TribesClusterMemberServiceImpl extends 
ClusterMemberServiceBase {
 
     public static final String CLUSTER_TRIBES_ARGS_PROP = 
"org.amdatu.fabric.cluster.tribes.args";
 
-    private final Map<String, Member> m_memberIdMembers = new HashMap<String, 
Member>();
-    private final ReentrantReadWriteLock m_memberIdMembersLock = new 
ReentrantReadWriteLock();
-
-    private final Set<Member> m_faultyMembers = new HashSet<Member>();
-    private final ReentrantReadWriteLock m_faultyMembersLock = new 
ReentrantReadWriteLock();
+    private final Map<String, Member> m_members = new HashMap<String, 
Member>();
+    private final ReentrantReadWriteLock m_membersLock = new 
ReentrantReadWriteLock();
+    private volatile Member[] m_membersA = new Member[0];
 
     private volatile ManagedChannel m_managedChannel;
 
@@ -59,9 +54,9 @@
      * Constructors
      ********************************************************/
 
-    public TribesClusterMemberServiceImpl(String clusterGroupId, String 
clusterMemberId,
+    public TribesClusterMemberServiceImpl(String clusterGroupId,
         Dictionary<String, Object> properties) {
-        super(clusterGroupId, clusterMemberId, properties);
+        super(clusterGroupId, properties);
     }
 
     /********************************************************
@@ -75,19 +70,23 @@
     }
 
     protected synchronized void onStart() {
+        getLogService().log(LogService.LOG_DEBUG, "Starting managed channel");
         try {
-            getLogService().log(LogService.LOG_DEBUG, "Starting managed 
channel");
             m_managedChannel =
-                (ManagedChannel) ChannelCreator.createChannel((String[]) 
getProperties().get(
-                    CLUSTER_TRIBES_ARGS_PROP));
+                (ManagedChannel) ChannelBuilder
+                    .createChannel(getClusterId(), getProperties());
 
             Properties props = new Properties();
             props.setProperty(SERVICE_CLUSTERCHANNEL_PROPERTY, getClusterId());
             props.setProperty(SERVICE_CLUSTERMEMBER_PROPERTY, getMemberId());
+
+            
m_managedChannel.getMembershipService().setPayload(getPayload(props));
             m_managedChannel.addMembershipListener(new 
TribesMembershipListener());
             m_managedChannel.addChannelListener(new TribesChannelListener());
-            
m_managedChannel.getMembershipService().setPayload(getPayload(props));
             m_managedChannel.start(Channel.DEFAULT);
+
+            Member localMember = 
m_managedChannel.getMembershipService().getLocalMember(false);
+            setMemberId(Arrays.toString(localMember.getUniqueId()));
         }
         catch (Exception e) {
             getLogService().log(LogService.LOG_ERROR, "Exception while 
starting managed channel", e);
@@ -95,8 +94,8 @@
     }
 
     protected synchronized void onStop() {
+        getLogService().log(LogService.LOG_DEBUG, "Stopping managed channel");
         try {
-            getLogService().log(LogService.LOG_DEBUG, "Stopping managed 
channel");
             m_managedChannel.stop(Channel.DEFAULT);
         }
         catch (Exception e) {
@@ -116,24 +115,14 @@
                     + message.toString());
             return;
         }
-        m_memberIdMembersLock.readLock().lock();
-        try {
-            if (m_memberIdMembers.size() == 0) {
-                getLogService().log(LogService.LOG_WARNING,
+        Member[] members = m_membersA;
+        if (members.length == 0) {
+            getLogService().log(LogService.LOG_WARNING,
                         "Dropping message during send because there are no 
active members on my channel: "
                             + message.toString());
-                return;
-            }
-            Member[] members = m_memberIdMembers.values().toArray(new 
Member[m_memberIdMembers.size()]);
-            m_managedChannel.send(members, (Serializable) message, 
Channel.SEND_OPTIONS_ASYNCHRONOUS);
-        }
-        catch (ChannelException e) {
-            getLogService().log(LogService.LOG_ERROR,
-                "Exception during send on managed channel: " + 
message.toString(), e);
-        }
-        finally {
-            m_memberIdMembersLock.readLock().unlock();
+            return;
         }
+        sendToMembers(message, members);
     }
 
     @Override
@@ -145,30 +134,27 @@
             return;
         }
         List<Member> memberList = new LinkedList<Member>();
-        m_memberIdMembersLock.readLock().lock();
+        m_membersLock.readLock().lock();
         try {
             for (ClusterMember clusterMember : clusterMembers) {
-                Member member = 
m_memberIdMembers.get(clusterMember.getMemberId());
+                Member member = m_members.get(clusterMember.getMemberId());
                 if (member != null) {
                     memberList.add(member);
                 }
             }
-            if (memberList.size() == 0) {
-                getLogService().log(LogService.LOG_WARNING,
-                            "Dropping message during send because there are no 
matching members on my channel: "
-                                + message.toString());
-                return;
-            }
-            Member[] members = memberList.toArray(new 
Member[memberList.size()]);
-            m_managedChannel.send(members, (Serializable) message, 
Channel.SEND_OPTIONS_ASYNCHRONOUS);
-        }
-        catch (ChannelException e) {
-            getLogService().log(LogService.LOG_ERROR,
-                "Exception during send on managed channel: " + 
message.toString(), e);
         }
         finally {
-            m_memberIdMembersLock.readLock().unlock();
+            m_membersLock.readLock().unlock();
+        }
+
+        if (memberList.size() == 0) {
+            getLogService().log(LogService.LOG_WARNING,
+                        "Dropping message during send because there are no 
matching members on my channel: "
+                            + message.toString());
+            return;
         }
+        Member[] members = memberList.toArray(new Member[memberList.size()]);
+        sendToMembers(message, members);
     }
 
     /********************************************************
@@ -181,53 +167,45 @@
         return bout.toByteArray();
     }
 
-    private Properties getProperties(byte[] payload) throws IOException {
-        ByteArrayInputStream bin = new ByteArrayInputStream(payload);
-        Properties props = new Properties();
-        props.load(bin);
-        return props;
-    }
-
-    private void addMemberIdMember(String clusterMember, Member member) {
-        m_memberIdMembersLock.writeLock().lock();
+    private void memberAdded(Member member) {
+        String memberId = getMemberId(member);
+        m_membersLock.writeLock().lock();
         try {
-            m_memberIdMembers.put(clusterMember, member);
+            m_members.put(memberId, member);
+            m_membersA = m_members.values().toArray(new 
Member[m_members.size()]);
         }
         finally {
-            m_memberIdMembersLock.writeLock().unlock();
+            m_membersLock.writeLock().unlock();
         }
-        addClusterMember(new ClusterMemberImpl(clusterMember));
+        super.addClusterMember(new ClusterMemberImpl(memberId));
     }
 
-    private void removeMemberIdMember(String clusterMember) {
-        m_memberIdMembersLock.writeLock().lock();
+    private void memberDisappeared(Member member) {
+        String memberId = getMemberId(member);
+        m_membersLock.writeLock().lock();
         try {
-            m_memberIdMembers.remove(clusterMember);
+            m_members.remove(memberId);
+            m_membersA = m_members.values().toArray(new 
Member[m_members.size()]);
         }
         finally {
-            m_memberIdMembersLock.writeLock().unlock();
+            m_membersLock.writeLock().unlock();
         }
-        removeClusterMember(clusterMember);
+        super.removeClusterMember(memberId);
     }
 
-    private void addFaultyMember(Member member) {
-        m_faultyMembersLock.writeLock().lock();
+    private void sendToMembers(Object message, Member[] members) {
         try {
-            m_faultyMembers.add(member);
+            m_managedChannel.send(members, (Serializable) message, 
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+            getLogService().log(LogService.LOG_WARNING, "messageSend: " + 
message.toString());
         }
-        finally {
-            m_faultyMembersLock.writeLock().unlock();
+        catch (ChannelException e) {
+            getLogService().log(LogService.LOG_ERROR,
+                "Exception during send on managed channel: " + 
message.toString(), e);
         }
     }
 
-    private boolean removeFaultyMember(Member member) {
-        m_faultyMembersLock.writeLock().lock();
-        try {
-            return m_faultyMembers.remove(member);
-        }
-        finally {
-            m_faultyMembersLock.writeLock().unlock();
-        }
+    private String getMemberId(Member member) {
+        return Arrays.toString(member.getUniqueId());
     }
 
     /********************************************************
@@ -237,65 +215,31 @@
     class TribesMembershipListener implements MembershipListener {
 
         public void memberAdded(Member member) {
-            try {
-                Properties props = getProperties(member.getPayload());
-                String groupchannel = (String) 
props.get(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY);
-                if (groupchannel == null || 
!groupchannel.equals(getClusterId())) {
-                    getLogService().log(LogService.LOG_ERROR,
-                        "Member joined with invalid channelid! Will ignore it: 
" + member.toString());
-                    addFaultyMember(member);
-                    return;
-                }
-                String channelmember = (String) 
props.get(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY);
-                if (channelmember == null || 
channelmember.equals(getMemberId())
-                    || getClusterMember(channelmember) != null) {
-                    getLogService().log(LogService.LOG_ERROR,
-                        "Member joined with invalid memberid! Will ignore it: 
" + member.toString());
-                    addFaultyMember(member);
-                    return;
-                }
-                addMemberIdMember(channelmember, member);
-                getLogService().log(LogService.LOG_DEBUG, "Member added: " + 
member.toString());
-            }
-            catch (Exception e) {
-                getLogService().log(LogService.LOG_ERROR, "Exception while 
adding member: " + member.toString(), e);
-            }
+            TribesClusterMemberServiceImpl.this.memberAdded(member);
         }
 
         public void memberDisappeared(Member member) {
-            try {
-                if (removeFaultyMember(member)) {
-                    getLogService().log(LogService.LOG_DEBUG, "Faulty member 
disappeared: " + member.toString());
-                    return;
-                }
-                String memberId = 
getProperties(member.getPayload()).getProperty(SERVICE_CLUSTERMEMBER_PROPERTY);
-                removeMemberIdMember(memberId);
-            }
-            catch (Exception e) {
-                getLogService().log(LogService.LOG_ERROR, "Exception while 
removing member: " + member.toString(), e);
-            }
+            TribesClusterMemberServiceImpl.this.memberDisappeared(member);
         }
     }
 
     class TribesChannelListener implements ChannelListener {
 
         public boolean accept(Serializable message, Member member) {
-            m_faultyMembersLock.readLock().lock();
-            try {
-                if (m_faultyMembers.contains(member)) {
-                    getLogService().log(LogService.LOG_WARNING,
-                        "Dropping message recieved from faulty member: " + 
member.toString());
-                    return false;
-                }
-            }
-            finally {
-                m_faultyMembersLock.readLock().unlock();
-            }
             return true;
         }
 
         public void messageReceived(Serializable message, Member member) {
-            dispatchMessage(message);
+            String memberId = getMemberId(member);
+            ClusterMember clusterMember = getClusterMember(memberId);
+            if (clusterMember == null) {
+                getLogService().log(LogService.LOG_ERROR,
+                    "Dropping message for active member: " + 
message.toString());
+                return;
+            }
+            getLogService().log(LogService.LOG_WARNING,
+                "messageRecieved: " + message.toString());
+            dispatchMessage(message, clusterMember);
         }
     }
 }
\ No newline at end of file

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
  Tue Jan 11 10:25:35 2011
@@ -121,21 +121,28 @@
 
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("ServiceEndPoint[");
-        sb.append("clusterid=" + getClusterId());
-        sb.append("; memberid=" + getMemberId());
-        sb.append("; objectClass={");
-        for (String part : getObjectClass()) {
-            sb.append(" " + part);
+        StringBuilder sb = new 
StringBuilder(ServiceEndPoint.class.getSimpleName() + "{");
+        sb.append("\n\tclusterid=" + getClusterId());
+        sb.append("\n\tmemberid=" + getMemberId());
+        sb.append("\n\tobjectClass=[");
+        for (int i = 0; i < getObjectClass().length; i++) {
+            if (i > 0)
+                sb.append(",");
+            sb.append(getObjectClass()[i]);
         }
-        sb.append(" }; properties={");
+        sb.append("]");
+        sb.append("\n\tproperties=[");
         Enumeration<String> enumeration = getProperties().keys();
+        boolean first = true;
         while (enumeration.hasMoreElements()) {
             String key = (String) enumeration.nextElement();
-            sb.append(" " + key + ":" + getProperties().get(key).toString());
+            if (!first)
+                sb.append(",");
+            sb.append(key + ":" + getProperties().get(key).toString());
+            first = false;
         }
-        sb.append("}]");
+        sb.append("\n\t]");
+        sb.append("\n}");
         return sb.toString();
     }
 

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
      (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
      Tue Jan 11 10:25:35 2011
@@ -1,3 +1,19 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
 package org.amdatu.core.fabric.remote.internal;
 
 import org.amdatu.core.fabric.remote.DiscoveryService;

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
        Tue Jan 11 10:25:35 2011
@@ -16,15 +16,17 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
-import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
 
-public class EndpointDepublishMessage extends RoutableMessage implements 
LocalTopicMessage, Serializable {
+public class EndpointDepublishMessage extends RoutableMessage implements 
LocalTopicMessage {
 
     private static final long serialVersionUID = 1L;
+    private static final transient Pattern p = Pattern.compile("\n");
 
     private final ServiceEndPoint m_serviceEndPoint;
 
@@ -43,4 +45,15 @@
         return 
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
             m_serviceEndPoint.getServiceGroup());
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(EndpointDepublishMessage.class.getSimpleName() + "{");
+        Matcher m1 = p.matcher(super.toString());
+        sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+        Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+        sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+        sb.append("\n}");
+        return sb.toString();
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
        Tue Jan 11 10:25:35 2011
@@ -16,18 +16,22 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
-import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
 
-public class EndpointDiscoveryMessage implements LocalTopicMessage, 
Serializable {
+public class EndpointDiscoveryMessage extends RoutableMessage implements 
LocalTopicMessage {
 
     private static final long serialVersionUID = 1L;
+    private static final transient Pattern p = Pattern.compile("\n");
 
     private final String m_clusterId;
     private final String m_serviceGroup;
 
     public EndpointDiscoveryMessage(String clusterId, String serviceGroup) {
+        super(clusterId, null, serviceGroup);
         m_clusterId = clusterId;
         m_serviceGroup = serviceGroup;
     }
@@ -36,4 +40,16 @@
         return DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterId,
             m_serviceGroup);
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(EndpointDiscoveryMessage.class.getSimpleName() + "{");
+        Matcher m1 = p.matcher(super.toString());
+        sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+        sb.append("\n\tclusterId=" + m_clusterId);
+        sb.append("\n\tmemberId=" + getOriginMemberId());
+        sb.append("\n\tserviceGroup=" + m_serviceGroup);
+        sb.append("\n}");
+        return sb.toString();
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
   Tue Jan 11 10:25:35 2011
@@ -17,6 +17,8 @@
 package org.amdatu.core.fabric.remote.internal;
 
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
@@ -25,6 +27,7 @@
 public class EndpointInvokeMessage extends RoutableMessage implements 
LocalTopicMessage {
 
     private static final long serialVersionUID = 1L;
+    private static final transient Pattern p = Pattern.compile("\n");
 
     private final ServiceEndPoint m_serviceEndPoint;
     private final Map<String, Object> m_payload;
@@ -47,4 +50,16 @@
         return DistributionUtilities.getLocalInvokeTopic(getTargetClusterId(),
             getTargetServiceGroup());
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("\nEndpointInvokeMessage{");
+        Matcher m1 = p.matcher(super.toString());
+        sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+        Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+        sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+        sb.append("\n\tpayload=" + m_payload.toString());
+        sb.append("\n}");
+        return sb.toString();
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
  (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
  Tue Jan 11 10:25:35 2011
@@ -16,6 +16,9 @@
  */
 package org.amdatu.core.fabric.remote.internal;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
 import org.amdatu.core.fabric.remote.ServiceEndPoint;
@@ -23,6 +26,7 @@
 public class EndpointPublishMessage extends RoutableMessage implements 
LocalTopicMessage {
 
     private static final long serialVersionUID = 1L;
+    private static final transient Pattern p = Pattern.compile("\n");
 
     private ServiceEndPoint m_serviceEndPoint;
 
@@ -41,4 +45,15 @@
         return 
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
             m_serviceEndPoint.getServiceGroup());
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(EndpointPublishMessage.class.getSimpleName() + "{");
+        Matcher m1 = p.matcher(super.toString());
+        sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+        Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+        sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+        sb.append("\n}");
+        return sb.toString();
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
 Tue Jan 11 10:25:35 2011
@@ -17,6 +17,8 @@
 package org.amdatu.core.fabric.remote.internal;
 
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.amdatu.core.fabric.cluster.LocalTopicMessage;
 import org.amdatu.core.fabric.cluster.RoutableMessage;
@@ -24,6 +26,7 @@
 public class EndpointResponseMessage extends RoutableMessage implements 
LocalTopicMessage {
 
     private static final long serialVersionUID = 1L;
+    private static final transient Pattern p = Pattern.compile("\n");
 
     private Map<String, Object> m_payload;
 
@@ -42,4 +45,14 @@
         return 
DistributionUtilities.getLocalResponseTopic(getTargetClusterId(),
             getTargetServiceGroup());
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder(EndpointResponseMessage.class.getSimpleName() + "{");
+        Matcher m1 = p.matcher(super.toString());
+        sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+        sb.append("\n\tpayload=" + m_payload.toString());
+        sb.append("\n}");
+        return sb.toString();
+    }
 }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
   Tue Jan 11 10:25:35 2011
@@ -68,6 +68,8 @@
     private final String m_clusterGroupId;
     private final String m_serviceGroupId;
 
+    private final String m_clusterChannelInvokeTopic;
+
     /********************************************************
      * Constructors
      ********************************************************/
@@ -83,6 +85,7 @@
                 m_serviceInterfaceMethods.add(serviceMethod);
             }
         }
+        m_clusterChannelInvokeTopic = 
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId);
     }
 
     /********************************************************
@@ -126,7 +129,7 @@
     }
 
     public synchronized void stop() {
-        m_logService.log(LogService.LOG_WARNING, "Starting 
LocalServiceInvocationHandler");
+        m_logService.log(LogService.LOG_WARNING, "Stopping 
LocalServiceInvocationHandler");
         m_dependencyManager.remove(m_serviceInvocationEventHandlerComponent);
     }
 
@@ -145,9 +148,7 @@
 
             Dictionary<String, Object> eventPayload = new Hashtable<String, 
Object>();
             eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, 
message);
-            Event event =
-                new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
-                    eventPayload);
+            Event event = new Event(m_clusterChannelInvokeTopic, eventPayload);
             m_eventAdmin.postEvent(event);
 
             Object response = retrieveInvocationResponse(invocationIdentifier);

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
     Tue Jan 11 10:25:35 2011
@@ -20,11 +20,15 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.MemberAddedEvent;
+import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
 import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
 import org.amdatu.core.fabric.remote.DiscoveryService;
 import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
@@ -126,7 +130,8 @@
 
         Dictionary<String, Object> eventHandlerProps = new Hashtable<String, 
Object>();
         eventHandlerProps.put(EventConstants.EVENT_TOPIC,
-            new String[] { 
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId) 
});
+            new String[] { 
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId),
+                
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterGroupId) });
         m_discoveryEventHandlerComponent = 
m_dependencyManager.createComponent();
         
m_discoveryEventHandlerComponent.setInterface(EventHandler.class.getName(), 
eventHandlerProps);
         m_discoveryEventHandlerComponent.setImplementation(new 
DiscoveryEventHandler());
@@ -139,7 +144,7 @@
     public synchronized void start() {
         m_logService.log(LogService.LOG_INFO, "Starting " + toString());
         m_dependencyManager.add(m_discoveryEventHandlerComponent);
-        m_evenAdmin.postEvent(createDiscoveryEvent());
+        m_evenAdmin.postEvent(createDiscoveryEvent(null));
     }
 
     public synchronized void stop() {
@@ -159,7 +164,7 @@
         try {
             if (!m_remotableEndPoints.contains(serviceEndPoint)) {
                 m_remotableEndPoints.add(serviceEndPoint);
-                
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint));
+                
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint, null));
             }
             else {
                 throw new IllegalStateException("Unexpected state... needs 
analysis");
@@ -194,7 +199,15 @@
      * Private methods
      ********************************************************/
 
-    private Event createDiscoveryEvent() {
+    private Event createDiscoveryEvent(MemberAddedEvent memberAddedEvent) {
+        EndpointDiscoveryMessage endpointDiscoveryMessage =
+            new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId);
+
+        if (memberAddedEvent != null) {
+            endpointDiscoveryMessage.setTargetClusterId(m_clusterGroupId);
+            
endpointDiscoveryMessage.setTargetMemberId(memberAddedEvent.getMemberId());
+        }
+
         Dictionary<String, Object> eventProps = new Hashtable<String, 
Object>();
         eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
             new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
@@ -204,9 +217,17 @@
         return discoveryEvent;
     }
 
-    private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint) {
+    private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint,
+        EndpointDiscoveryMessage endpointDiscoveryMessage) {
         Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new 
EndpointPublishMessage(serviceEndPoint));
+        EndpointPublishMessage endpointPublishMessage = new 
EndpointPublishMessage(serviceEndPoint);
+
+        if (endpointDiscoveryMessage != null) {
+            
endpointPublishMessage.setTargetClusterId(endpointDiscoveryMessage.getOriginClusterId());
+            
endpointPublishMessage.setTargetMemberId(endpointDiscoveryMessage.getOriginMemberId());
+        }
+
+        props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, 
endpointPublishMessage);
         Event event =
             new 
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
                 props);
@@ -239,7 +260,7 @@
         m_remotableEndPointsLock.writeLock().lock();
         try {
             for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
-                Event event = createEndpointPublishEvent(serviceEndPoint);
+                Event event = createEndpointPublishEvent(serviceEndPoint, 
endpointDiscoveryMessage);
                 m_evenAdmin.postEvent(event);
             }
         }
@@ -279,6 +300,8 @@
                 m_clusterGroupId);
             distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
                 m_serviceGroupId);
+            
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
+                serviceEndPoint.getMemberId());
             serviceComponent =
                 m_dependencyManager.createComponent()
                     .setInterface(RemoteServiceEndPoint.class.getName(), 
distributionProps)
@@ -300,6 +323,43 @@
         return;
     }
 
+    private void recieveMemberAddedEvent(final MemberAddedEvent 
memberAddedEvent) {
+        m_logService.log(LogService.LOG_ERROR, "Recieved \n" + 
memberAddedEvent.toString());
+        m_evenAdmin.postEvent(createDiscoveryEvent(memberAddedEvent));
+    }
+
+    private void recieveMemberRemovedEvent(final MemberRemovedEvent 
memberRemovedEvent) {
+        m_logService.log(LogService.LOG_ERROR, "Recieved \n" + 
memberRemovedEvent.toString());
+        List<ServiceEndPoint> removeServiceEndpointList = null;
+        List<Component> removeServiceComponentList = null;
+        m_remoteEndPointComponentsLock.writeLock().lock();
+        try {
+            for (ServiceEndPoint serviceEndPoint : 
m_remoteEndPointComponents.keySet()) {
+                if 
(serviceEndPoint.getMemberId().equals(memberRemovedEvent.getMemberId())) {
+                    if (removeServiceEndpointList == null)
+                        removeServiceEndpointList = new 
LinkedList<ServiceEndPoint>();
+                    removeServiceEndpointList.add(serviceEndPoint);
+                }
+            }
+            if (removeServiceEndpointList != null) {
+                removeServiceComponentList = new LinkedList<Component>();
+                for (ServiceEndPoint serviceEndPoint : 
removeServiceEndpointList) {
+                    m_logService.log(LogService.LOG_ERROR, "Removing\n" + 
memberRemovedEvent.toString());
+                    
removeServiceComponentList.add(m_remoteEndPointComponents.remove(serviceEndPoint));
+                }
+            }
+        }
+        finally {
+            m_remoteEndPointComponentsLock.writeLock().unlock();
+        }
+        if (removeServiceComponentList != null) {
+            for (Component serviceComponent : removeServiceComponentList) {
+                m_dependencyManager.remove(serviceComponent);
+            }
+        }
+        return;
+    }
+
     /********************************************************
      * Helper classes
      ********************************************************/
@@ -320,6 +380,14 @@
                 recieveEndpointDiscoveryMessage((EndpointDiscoveryMessage) 
message);
                 return;
             }
+            if (message instanceof MemberAddedEvent) {
+                recieveMemberAddedEvent((MemberAddedEvent) message);
+                return;
+            }
+            if (message instanceof MemberRemovedEvent) {
+                recieveMemberRemovedEvent((MemberRemovedEvent) message);
+                return;
+            }
             throw new IllegalStateException("Unknown message type " + 
message.getClass().getName() + "on channel "
                 + DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, 
m_serviceGroupId));
         }

Modified: 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
--- 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
        (original)
+++ 
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
        Tue Jan 11 10:25:35 2011
@@ -99,7 +99,7 @@
                     properties
                         
.put(TribesClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, args.split(" "));
 
-                    createClusterMember(clusterGroupId, clusterMemberId, 
properties);
+                    createClusterChannel(clusterGroupId, properties);
                 }
             }
 
@@ -125,26 +125,19 @@
      * FabricManagerService methods
      ********************************************************/
 
-    public boolean createClusterMember(String clusterGroupId, String 
clusterMemberId,
-        Dictionary<String, Object> properties) {
-        if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
-            m_logService.log(LogService.LOG_ERROR, "Failed to create cluster. 
Invalid clustername: " + clusterGroupId);
-            return false;
-        }
-        if (!ClusterMemberUtilities.isValidMemberName(clusterMemberId)) {
-            m_logService.log(LogService.LOG_ERROR, "Failed to create cluster. 
Invalid membername: " + clusterMemberId);
+    public boolean createClusterChannel(String clusterChannelName, 
Dictionary<String, Object> clusterChannelProperties) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to create 
clusterchannel. Invalid clusterChannelName: "
+                + clusterChannelName);
             return false;
         }
 
-        Dictionary<String, Object> svcProperties = new Hashtable<String, 
Object>();
-// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY, 
clusterMemberId);
-// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY, 
clusterMemberId);
-
         Component clusterMemberComponent =
             m_dependencyManager
                 .createComponent()
-                .setInterface(ClusterMemberService.class.getName(), properties)
-                .setFactory(new ClusterMemberServiceFactory(clusterGroupId, 
clusterMemberId, properties), "getInstance")
+                .setInterface(ClusterMemberService.class.getName(), 
clusterChannelProperties)
+                .setFactory(new 
ClusterMemberServiceFactory(clusterChannelName, clusterChannelProperties),
+                    "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
@@ -152,11 +145,12 @@
 
         m_clusterMemberComponentsLock.writeLock().lock();
         try {
-            if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId, 
clusterMemberId, ""))) {
+            if (m_clusterMemberComponents.containsKey(clusterChannelName)) {
                 m_dependencyManager.remove(m_clusterMemberComponents
-                    .remove(getKey(clusterGroupId, clusterMemberId, "")));
+                    .remove(clusterChannelName));
+
             }
-            m_clusterMemberComponents.put(getKey(clusterGroupId, 
clusterMemberId, ""), clusterMemberComponent);
+            m_clusterMemberComponents.put(clusterChannelName, 
clusterMemberComponent);
             m_dependencyManager.add(clusterMemberComponent);
             return true;
         }
@@ -165,16 +159,17 @@
         }
     }
 
-    public boolean removeClusterMember(String clusterGroupId, String 
clusterMemberId) {
-        if (clusterMemberId == null || "".equals(clusterMemberId)) {
+    public boolean removeClusterChannel(String clusterGroupId) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to remove 
clusterchannel. Invalid clusterChannelName: "
+                + clusterGroupId);
             return false;
         }
 
         m_clusterMemberComponentsLock.writeLock().lock();
         try {
-            if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId, 
clusterMemberId, ""))) {
-                m_dependencyManager.remove(m_clusterMemberComponents
-                    .remove(getKey(clusterGroupId, clusterMemberId, "")));
+            if (m_clusterMemberComponents.containsKey(clusterGroupId)) {
+                
m_dependencyManager.remove(m_clusterMemberComponents.remove(clusterGroupId));
                 return true;
             }
             return false;
@@ -184,15 +179,25 @@
         }
     }
 
-    public boolean createDiscovery(String clusterGroupId, String 
serviceGroupId) {
-        // FIXME improve checking
+    public boolean createDiscovery(String clusterChannelName, String 
serviceGroupName) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+            m_logService
+                .log(LogService.LOG_ERROR, "Failed to create discovery. 
Invalid clusterChannelName: "
+                    + clusterChannelName);
+            return false;
+        }
+        if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to create 
discovery. Invalid serviceGroupName: "
+                + serviceGroupName);
+            return false;
+        }
 
         Component discoveryComponent =
             m_dependencyManager
                 .createComponent()
                 .setInterface(DiscoveryService.class.getName(),
                     null)
-                .setFactory(new DiscoveryServiceFactory(clusterGroupId, 
serviceGroupId), "getInstance")
+                .setFactory(new DiscoveryServiceFactory(clusterChannelName, 
serviceGroupName), "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
@@ -200,11 +205,11 @@
 
         m_discoveryComponentsLock.writeLock().lock();
         try {
-            if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "", 
serviceGroupId))) {
-                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId, 
"",
-                    serviceGroupId)));
+            if (m_discoveryComponents.containsKey(getKey(clusterChannelName, 
serviceGroupName))) {
+                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterChannelName,
+                    serviceGroupName)));
             }
-            m_discoveryComponents.put(getKey(clusterGroupId, "", 
serviceGroupId), discoveryComponent);
+            m_discoveryComponents.put(getKey(clusterChannelName, 
serviceGroupName), discoveryComponent);
             m_dependencyManager.add(discoveryComponent);
             return true;
         }
@@ -213,14 +218,23 @@
         }
     }
 
-    public boolean removeDiscovery(String clusterGroupId, String 
serviceGroupId) {
-        // FIXME improve checks
-
+    public boolean removeDiscovery(String clusterChannelName, String 
serviceGroupName) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+            m_logService
+                .log(LogService.LOG_ERROR, "Failed to remove discovery. 
Invalid clusterChannelName: "
+                    + clusterChannelName);
+            return false;
+        }
+        if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to remove 
discovery. Invalid serviceGroupName: "
+                + serviceGroupName);
+            return false;
+        }
         m_discoveryComponentsLock.writeLock().lock();
         try {
-            if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "", 
serviceGroupId))) {
-                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId, 
"",
-                    serviceGroupId)));
+            if (m_discoveryComponents.containsKey(getKey(clusterChannelName, 
serviceGroupName))) {
+                
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterChannelName,
+                    serviceGroupName)));
                 return true;
             }
             return false;
@@ -230,14 +244,22 @@
         }
     }
 
-    public boolean createDistribution(String clusterGroupId, String 
serviceGroupId) {
-        // FIXME improve checks
-
+    public boolean createDistribution(String clusterChannelName, String 
serviceGroupName) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+            m_logService
+                .log(LogService.LOG_ERROR, "Failed to create distribution. 
Invalid clustername: " + clusterChannelName);
+            return false;
+        }
+        if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to remove 
distribution. Invalid servicegroupname: "
+                + serviceGroupName);
+            return false;
+        }
         Component distributionComponent =
             m_dependencyManager
                 .createComponent()
                 .setInterface(DistributionService.class.getName(), null)
-                .setFactory(new DistributionServiceFactory(clusterGroupId, 
serviceGroupId), "getInstance")
+                .setFactory(new DistributionServiceFactory(clusterChannelName, 
serviceGroupName), "getInstance")
                 .add(
                     m_dependencyManager.createServiceDependency()
                         .setService(FabricManagerService.class)
@@ -245,11 +267,11 @@
 
         m_distributionComponentsLock.writeLock().lock();
         try {
-            if (m_distributionComponents.containsKey(getKey(clusterGroupId, 
"", serviceGroupId))) {
-                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
 "",
-                    serviceGroupId)));
+            if 
(m_distributionComponents.containsKey(getKey(clusterChannelName, 
serviceGroupName))) {
+                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterChannelName,
+                    serviceGroupName)));
             }
-            m_distributionComponents.put(getKey(clusterGroupId, "", 
serviceGroupId),
+            m_distributionComponents.put(getKey(clusterChannelName, 
serviceGroupName),
                 distributionComponent);
             m_dependencyManager.add(distributionComponent);
             return true;
@@ -259,14 +281,23 @@
         }
     }
 
-    public boolean removeDistribution(String clusterGroupId, String 
serviceGroupId) {
-        // TODO improve checks
-
+    public boolean removeDistribution(String clusterChannelName, String 
serviceGroupName) {
+        if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+            m_logService
+                .log(LogService.LOG_ERROR, "Failed to create distribution. 
Invalid clusterChannelname: "
+                    + clusterChannelName);
+            return false;
+        }
+        if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to remove 
distribution. Invalid serviceGroupName: "
+                + serviceGroupName);
+            return false;
+        }
         m_distributionComponentsLock.writeLock().lock();
         try {
-            if (m_distributionComponents.containsKey(getKey(clusterGroupId, 
"", serviceGroupId))) {
-                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
 "",
-                    serviceGroupId)));
+            if 
(m_distributionComponents.containsKey(getKey(clusterChannelName, 
serviceGroupName))) {
+                
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterChannelName,
+                    serviceGroupName)));
                 return true;
             }
             return false;
@@ -280,8 +311,8 @@
      * Private methods
      ********************************************************/
 
-    private String getKey(String clusterGroupId, String clusterMemberId, 
String serviceGroupId) {
-        return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
+    private String getKey(String clusterChannelName, String serviceGroupName) {
+        return clusterChannelName + "#" + serviceGroupName;
     }
 
     /********************************************************
@@ -290,49 +321,47 @@
 
     static class ClusterMemberServiceFactory {
 
-        private final String m_clusterGroupId;
-        private final String m_clusterMemberId;
-        private final Dictionary<String, Object> m_properties;
-
-        public ClusterMemberServiceFactory(String clusterGroupId, String 
clusterMemberId,
-            Dictionary<String, Object> properties) {
-            m_clusterGroupId = clusterGroupId;
-            m_clusterMemberId = clusterMemberId;
-            m_properties = properties;
+        private final String m_clusterChannelName;
+        private final Dictionary<String, Object> m_clusterChannelProperties;
+
+        public ClusterMemberServiceFactory(String clusterChannelName,
+            Dictionary<String, Object> clusterChannelProperties) {
+            m_clusterChannelName = clusterChannelName;
+            m_clusterChannelProperties = clusterChannelProperties;
         }
 
         public ClusterMemberService getInstance() {
-            return new TribesClusterMemberServiceImpl(m_clusterGroupId, 
m_clusterMemberId, m_properties);
+            return new TribesClusterMemberServiceImpl(m_clusterChannelName, 
m_clusterChannelProperties);
         }
     }
 
     static class DiscoveryServiceFactory {
 
-        private final String m_clusterGroupId;
-        private final String m_serviceGroupId;
+        private final String m_clusterChannelName;
+        private final String m_serviceGroupName;
 
-        public DiscoveryServiceFactory(String clusterGroupId, String 
serviceGroupId) {
-            m_clusterGroupId = clusterGroupId;
-            m_serviceGroupId = serviceGroupId;
+        public DiscoveryServiceFactory(String clusterChannelName, String 
serviceGroupName) {
+            m_clusterChannelName = clusterChannelName;
+            m_serviceGroupName = serviceGroupName;
         }
 
         public DiscoveryService getInstance() {
-            return new DiscoveryServiceImpl(m_clusterGroupId, 
m_serviceGroupId);
+            return new DiscoveryServiceImpl(m_clusterChannelName, 
m_serviceGroupName);
         }
     }
 
     static class DistributionServiceFactory {
 
-        private final String m_clusterGroupId;
-        private final String m_serviceGroupId;
+        private final String m_clusterChannelName;
+        private final String m_serviceGroupName;
 
-        public DistributionServiceFactory(String clusterGroupId, String 
serviceGroupId) {
-            m_clusterGroupId = clusterGroupId;
-            m_serviceGroupId = serviceGroupId;
+        public DistributionServiceFactory(String clusterChannelName, String 
serviceGroupName) {
+            m_clusterChannelName = clusterChannelName;
+            m_serviceGroupName = serviceGroupName;
         }
 
         public DistributionService getInstance() {
-            return new DistributionServiceImpl(m_clusterGroupId, 
m_serviceGroupId);
+            return new DistributionServiceImpl(m_clusterChannelName, 
m_serviceGroupName);
         }
     }
 }

Reply via email to