Author: bdekruijff at gmail.com
Date: Mon Jan 3 17:18:44 2011
New Revision: 555
Log:
[sandbox] servicegroups / depman / javadoc / errorchecking and more
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
- copied, changed from r553,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
Removed:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
Mon Jan 3 17:18:44 2011
@@ -16,11 +16,90 @@
*/
package org.amdatu.core.fabric.cluster;
+/**
+ * Service interface for the Amdatu Fabric clustering and communication
+ * component. A service registration for this component manages the
+ * lifecycle for a specific unique <code>ClusterChannel</code> that
+ * provides discovery and message based communication with other Amdatu
+ * nodes on the same channel. A service instance has a unique
+ * <code>ClusterChannel</code> and a <code>ChannelMember</code> property
+ * unique to that channel. These properties are published as service
+ * registration properties along with any additional configuration that
+ * may be specific to the underlying implementation.
+ * <p/>
+ * All interaction with a <code>ClusterMemberService</code> is asynchronous
+ * and flows through the OSGi <code>EventAdmin</code> service. Messages
+ * can be posted to the clusterchannel specific SEND topic for delivery
+ * to the cluster. Messages received from the cluster will by default
+ * be posted on the clusterchannel specific RECIEVE topic. If the message
+ * class implements the <code>LocalTopicMessage</code> interface the
+ * topic specified by the message itself takes precedence.
+ * <p/>
+ * For optimization purposes basic message routing is supported through
+ * the <code>RoutableMessage</code> base class. Any message object extending
+ * this class can provide optional target information. In addition it will
+ * have origin information injected upon sending providing the receiving
+ * end with the information required to route a message back.
+ * <p/>
+ * Sample service registration:
+ * <pre>
+ * objectClass = org.amdatu.core.fabric.cluster.ClusterMemberService
+ * org.amdatu.fabric.cluster.CLUSTERGROUP = cluster-1
+ * org.amdatu.fabric.cluster.CLUSTERMEMBER = member-1
+ * org.amdatu.fabric.cluster.tribes.args = -port, 8881, -mport, 8888
+ * service.id = 922
+ * </pre>
+ *
+ */
public interface ClusterMemberService {
- String CLUSTER_CLUSTERGROUP_PROP =
"org.amdatu.fabric.cluster.CLUSTERGROUP";
- String CLUSTER_CLUSTERMEMBER_PROP =
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
+ /**
+ * OSGi service registration property name that is used to publish the
+ * clusterchannel <code>String</code> value for this service. This may be
+ * used for service selection.
+ */
+ String SERVICE_CLUSTERCHANNEL_PROPERTY =
"org.amdatu.fabric.cluster.CLUSTERGROUP";
+
+ /**
+ * OSGi service registration property name that is used to publish the
+ * clustermember <code>String</code> value for this service. This may be
+ * used for service selection.
+ */
+ String SERVICE_CLUSTERMEMBER_PROPERTY =
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
+
+ /**
+ * OSGi service registration property name that is used to publish the
+ * configuration properties <code>Dictionary<String, Object></code>
+ * value for this service. This may be used for service selection.
+ */
+ String SERVICE_CONFIGURATION_PROPERTY =
"org.amdatu.fabric.cluster.CONFIGURATION";
+
+ /**
+ * <code>String</code> value to be replaced by the appropriate
clusterchannel
+ * value in topic templates.
+ */
+ String EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER =
"[ClusterChannel]";
+
+ /**
+ * <code>EventAdmin</code> topic template to post message events to that
+ * should be broadcasted over the <code>ClusterChannelService</code>. Note
+ * that <code>[ClusterChannel]</code> should be replaced by the appropriate
+ * clusterchannel value.
+ */
+ String EVENT_SEND_TOPIC_TEMPLATE = "org/amdatu/fabric/cluster/SEND/"
+ + EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER;
+
+ /**
+ * <code>EventAdmin</code> topic template that messages received by the
+ * <code>ClusterChannelService</code> are posted to as events. Note that
+ * <code>[ClusterChannel]</code> should be replaced by the appropriate
value.
+ */
+ String EVENT_RECIEVE_TOPIC_TEMPLATE = "org/amdatu/fabric/cluster/RECIEVE/"
+ + EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER;
- String EVENT_TOPIC_BROADCAST = "org/amdatu/fabric/cluster/BROADCAST";
+ /**
+ * <code>Event</code> properties key where messages are (to be) stored in
+ * events posted to or received by the <code>ClusterMemberService</code>.
+ */
String EVENT_MESSAGE_PROPERTY = "org/amdatu/fabric/cluster/MESSAGE";
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
Mon Jan 3 17:18:44 2011
@@ -17,9 +17,17 @@
package org.amdatu.core.fabric.cluster;
/**
- * Interface for messages that want to specify the EventAdmin topic that the
- * ClusterMemberService should post them on at the receiver end.
+ * Interface for messages that want to specify the <code>EventAdmin</code>
+ * topic that the <code>ClusterMemberService</code> should post them on
+ * at the receiver end.
*/
public interface LocalTopicMessage {
+
+ /**
+ * Returns a valid <code>EventAdmin</code> topic that consumer may
+ * use to receive it.
+ *
+ * @return the topic
+ */
String getLocalTopic();
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
Mon Jan 3 17:18:44 2011
@@ -1,13 +1,31 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
package org.amdatu.core.fabric.cluster;
import java.io.Serializable;
/**
- * Base class for messages that want to enable routing support. Target
information may
- * be set to avoid a cluster wide broadcast. The origin information will be
injected by
- * the ClusterMemberService upon broadcast/send.
+ * Base class for messages that want to enable
<code>ClusterMemberService</code>
+ * routing support. Target information may be set to avoid a cluster wide
+ * broadcast. The origin information will be injected by the
ClusterMemberService
+ * upon sending.
*
* FIXME ServiceGroup is not a cluster concept
+ * FIXME reduce member names to save bytes?
*/
public abstract class RoutableMessage implements Serializable {
@@ -79,5 +97,4 @@
public final void setTargetServiceGroup(String serviceGroup) {
m_targetServiceGroup = serviceGroup;
}
-
}
\ No newline at end of file
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
Mon Jan 3 17:18:44 2011
@@ -0,0 +1,49 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
+
+public final class ClusterMemberUtilities {
+
+ private final static Pattern m_validNamePattern =
Pattern.compile("[a-zA-Z0-9-_]+");
+
+ public static String getClusterChannelSendTopic(final String
clusterChannel) {
+ return ClusterMemberService.EVENT_SEND_TOPIC_TEMPLATE.replace(
+
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
+ }
+
+ public static String getClusterChannelReceiveTopic(final String
clusterChannel) {
+ return ClusterMemberService.EVENT_RECIEVE_TOPIC_TEMPLATE.replace(
+
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
+ }
+
+ public static boolean isValidChannelName(final String channelName) {
+ if (channelName == null || "".equals(channelName)) {
+ return false;
+ }
+ Matcher matcher = m_validNamePattern.matcher(channelName);
+ return matcher.matches();
+ }
+
+ public static boolean isValidMemberName(final String memberName) {
+ return isValidChannelName(memberName);
+ }
+}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
(from r553,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
Mon Jan 3 17:18:44 2011
@@ -27,6 +27,7 @@
import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
import org.amdatu.core.fabric.cluster.RoutableMessage;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
import org.apache.felix.dm.ServiceDependency;
@@ -39,14 +40,17 @@
/**
* I manage cluster state
*/
-public abstract class BaseClusterMemberService implements ClusterMemberService
{
+public abstract class ClusterMemberServiceBase implements ClusterMemberService
{
private final Map<String, ClusterMember> m_clusterMembers = new
HashMap<String, ClusterMember>();
private final ReentrantReadWriteLock m_clusterMembersLock = new
ReentrantReadWriteLock();
private final String m_clusterId;
private final String m_memberId;
- private final Map<String, Object> m_properties;
+ private final Dictionary<String, Object> m_properties;
+
+ private final String m_recieveEventTopic;
+ private final String m_sendEventTopic;
// injected
private volatile DependencyManager m_dependencyManager;
@@ -60,11 +64,11 @@
* Constructors
********************************************************/
- public BaseClusterMemberService(String clusterGroupId, String
clusterMemberId,
+ public ClusterMemberServiceBase(String clusterGroupId, String
clusterMemberId,
Dictionary<String, Object> properties) {
m_clusterId = clusterGroupId;
m_memberId = clusterMemberId;
- m_properties = new HashMap<String, Object>();
+ m_properties = new Hashtable<String, Object>();
if (properties != null) {
Enumeration<String> enumeration = properties.keys();
while (enumeration.hasMoreElements()) {
@@ -72,6 +76,8 @@
m_properties.put(key, properties.get(key));
}
}
+ m_recieveEventTopic =
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterId);
+ m_sendEventTopic =
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterId);
}
/********************************************************
@@ -79,27 +85,32 @@
********************************************************/
public final synchronized void init() {
+
@SuppressWarnings("unchecked")
Dictionary<String, Object> serviceProps =
m_component.getServiceProperties();
if (serviceProps == null) {
serviceProps = new Hashtable<String, Object>();
}
- serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterId);
- serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP,
m_memberId);
+
+ serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterId);
+ serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
m_memberId);
+ serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY,
m_properties);
m_component.setServiceProperties(serviceProps);
ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
logServiceDependency.setService(LogService.class);
logServiceDependency.setRequired(true);
+ logServiceDependency.setInstanceBound(true);
m_component.add(logServiceDependency);
ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
eventAdminServiceDependency.setService(EventAdmin.class);
eventAdminServiceDependency.setRequired(true);
+ eventAdminServiceDependency.setInstanceBound(true);
m_component.add(eventAdminServiceDependency);
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EventConstants.EVENT_TOPIC, new String[] {
EVENT_TOPIC_BROADCAST + "/" + m_clusterId });
+ props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic
});
m_broadcastEventHandlerComponent =
m_dependencyManager.createComponent();
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
m_broadcastEventHandlerComponent.setImplementation(new
BroadcastEventHandler());
@@ -123,23 +134,27 @@
}
/********************************************************
- * ClusterMemberService
+ * Protected methods
********************************************************/
- public final String getClusterId() {
+ protected final LogService getLogService() {
+ return m_logService;
+ }
+
+ protected final String getClusterId() {
return m_clusterId;
}
- public final String getMemberId() {
+ protected final String getMemberId() {
return m_memberId;
}
- public final Dictionary<String, Object> getProperties() {
- return new Hashtable<String, Object>(m_properties);
+ protected final Dictionary<String, Object> getProperties() {
+ return m_properties;
}
- public final ClusterMember[] getClusterMembers() {
+ protected final ClusterMember[] getClusterMembers() {
m_clusterMembersLock.readLock().lock();
try {
return m_clusterMembers.values().toArray(new
ClusterMember[m_clusterMembers.size()]);
@@ -149,7 +164,7 @@
}
}
- public final ClusterMember getClusterMember(String memberId) {
+ protected final ClusterMember getClusterMember(final String memberId) {
m_clusterMembersLock.readLock().lock();
try {
return m_clusterMembers.get(memberId);
@@ -159,14 +174,6 @@
}
}
- /********************************************************
- * for implementing concrete classes
- ********************************************************/
-
- protected final LogService getLogService() {
- return m_logService;
- }
-
protected final void addClusterMember(ClusterMember clusterMember) {
m_clusterMembersLock.writeLock().lock();
try {
@@ -177,10 +184,10 @@
}
}
- protected final void removeClusterMember(ClusterMember clusterMember) {
+ protected final void removeClusterMember(String memberId) {
m_clusterMembersLock.writeLock().lock();
try {
- m_clusterMembers.remove(clusterMember.getMemberId());
+ m_clusterMembers.remove(memberId);
}
finally {
m_clusterMembersLock.writeLock().unlock();
@@ -190,15 +197,20 @@
protected final void dispatchMessage(Object message) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(EVENT_MESSAGE_PROPERTY, message);
- // FIXME DO NOT REBROADCAST
- String topic = EVENT_TOPIC_BROADCAST;
if (message instanceof LocalTopicMessage) {
- topic = ((LocalTopicMessage) message).getLocalTopic();
+ String topic = ((LocalTopicMessage) message).getLocalTopic();
+ Event broadCastEvent = new Event(topic, props);
+ m_eventAdmin.postEvent(broadCastEvent);
+ return;
}
- Event broadCastEvent = new Event(topic, props);
+ Event broadCastEvent = new Event(m_recieveEventTopic, props);
m_eventAdmin.postEvent(broadCastEvent);
}
+ /********************************************************
+ * Abstract methods
+ ********************************************************/
+
protected abstract void onInit();
protected abstract void onDestroy();
@@ -212,7 +224,7 @@
protected abstract void doSend(ClusterMember[] clusterMember, Object
message);
/********************************************************
- * helper classes
+ * Helper classes
********************************************************/
class BroadcastEventHandler implements EventHandler {
@@ -248,14 +260,10 @@
if (clusterMember != null) {
doSend(new ClusterMember[] { clusterMember }, message);
+ return;
}
- else {
- doBroadcast(message);
- }
- }
- else {
- doBroadcast(message);
}
+ doBroadcast(message);
}
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
Mon Jan 3 17:18:44 2011
@@ -22,16 +22,19 @@
import java.io.Serializable;
import java.util.Dictionary;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMember;
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
-import org.amdatu.core.fabric.cluster.service.BaseClusterMemberService;
+import org.amdatu.core.fabric.cluster.service.ClusterMemberServiceBase;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
@@ -40,12 +43,15 @@
import org.apache.catalina.tribes.MembershipListener;
import org.osgi.service.log.LogService;
-public final class TribesClusterMemberServiceImpl extends
BaseClusterMemberService {
+public final class TribesClusterMemberServiceImpl extends
ClusterMemberServiceBase {
public static final String CLUSTER_TRIBES_ARGS_PROP =
"org.amdatu.fabric.cluster.tribes.args";
- private final Map<ClusterMember, Member> m_clusterMemberMembers = new
HashMap<ClusterMember, Member>();
- private final ReentrantReadWriteLock m_clusterMemberMembersLock = new
ReentrantReadWriteLock();
+ private final Map<String, Member> m_memberIdMembers = new HashMap<String,
Member>();
+ private final ReentrantReadWriteLock m_memberIdMembersLock = new
ReentrantReadWriteLock();
+
+ private final Set<Member> m_faultyMembers = new HashSet<Member>();
+ private final ReentrantReadWriteLock m_faultyMembersLock = new
ReentrantReadWriteLock();
private volatile ManagedChannel m_managedChannel;
@@ -76,7 +82,8 @@
CLUSTER_TRIBES_ARGS_PROP));
Properties props = new Properties();
- props.setProperty(CLUSTER_CLUSTERMEMBER_PROP, getMemberId());
+ props.setProperty(SERVICE_CLUSTERCHANNEL_PROPERTY, getClusterId());
+ props.setProperty(SERVICE_CLUSTERMEMBER_PROPERTY, getMemberId());
m_managedChannel.addMembershipListener(new
TribesMembershipListener());
m_managedChannel.addChannelListener(new TribesChannelListener());
m_managedChannel.getMembershipService().setPayload(getPayload(props));
@@ -103,66 +110,69 @@
@Override
public void doBroadcast(Object message) {
- if (message instanceof Serializable) {
- try {
- Member[] members = m_managedChannel.getMembers();
- if (members.length > 0) {
- m_managedChannel.send(members, (Serializable) message,
- Channel.SEND_OPTIONS_ASYNCHRONOUS);
- }
- else {
- getLogService().log(
- LogService.LOG_WARNING,
- "Dropping message during broadcast because there are
no members on my channel: "
+ if (!(message instanceof Serializable)) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Dropping message during broadcast because is is not
Serializable: "
+ + message.toString());
+ return;
+ }
+ m_memberIdMembersLock.readLock().lock();
+ try {
+ if (m_memberIdMembers.size() == 0) {
+ getLogService().log(LogService.LOG_WARNING,
+ "Dropping message during send because there are no
active members on my channel: "
+ message.toString());
- }
- }
- catch (ChannelException e) {
- getLogService().log(LogService.LOG_ERROR, "Exception during
send on managed channel", e);
+ return;
}
+ Member[] members = m_memberIdMembers.values().toArray(new
Member[m_memberIdMembers.size()]);
+ m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
}
- else {
+ catch (ChannelException e) {
getLogService().log(LogService.LOG_ERROR,
- "Dropping message of type " + message.getClass().getName() + "
because it is not Serializable: "
- + message.toString());
+ "Exception during send on managed channel: " +
message.toString(), e);
+ }
+ finally {
+ m_memberIdMembersLock.readLock().unlock();
}
}
@Override
public void doSend(ClusterMember[] clusterMembers, Object message) {
- if (message instanceof Serializable) {
- try {
- List<Member> members = new LinkedList<Member>();
- m_clusterMemberMembersLock.readLock().lock();
- try {
- for (ClusterMember clusterMember : clusterMembers) {
- Member member =
m_clusterMemberMembers.get(clusterMember);
- if (member != null) {
- members.add(member);
- }
- }
- }
- finally {
- m_clusterMemberMembersLock.readLock().unlock();
- }
- if (members.size() > 0) {
- m_managedChannel.send(members.toArray(new
Member[members.size()]), (Serializable) message,
- Channel.SEND_OPTIONS_ASYNCHRONOUS);
- }
- else {
- getLogService().log(LogService.LOG_WARNING,
- "Dropping message during send because there are no
matching members on my channel: "
- + message.toString());
+ if (!(message instanceof Serializable)) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Dropping message during broadcast because is is not
Serializable: "
+ + message.toString());
+ return;
+ }
+ List<Member> memberList = new LinkedList<Member>();
+ m_memberIdMembersLock.readLock().lock();
+ try {
+ for (ClusterMember clusterMember : clusterMembers) {
+ Member member =
m_memberIdMembers.get(clusterMember.getMemberId());
+ if (member != null) {
+ memberList.add(member);
}
}
- catch (ChannelException e) {
- getLogService().log(LogService.LOG_ERROR, "Exception during
send on managed channel", e);
+ if (memberList.size() == 0) {
+ getLogService().log(LogService.LOG_WARNING,
+ "Dropping message during send because there are no
matching members on my channel: "
+ + message.toString());
+ return;
}
+ Member[] members = memberList.toArray(new
Member[memberList.size()]);
+ m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
+ catch (ChannelException e) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Exception during send on managed channel: " +
message.toString(), e);
+ }
+ finally {
+ m_memberIdMembersLock.readLock().unlock();
}
}
/********************************************************
- * Utility methods
+ * Private methods
********************************************************/
private byte[] getPayload(Properties props) throws IOException {
@@ -178,6 +188,48 @@
return props;
}
+ private void addMemberIdMember(String clusterMember, Member member) {
+ m_memberIdMembersLock.writeLock().lock();
+ try {
+ m_memberIdMembers.put(clusterMember, member);
+ }
+ finally {
+ m_memberIdMembersLock.writeLock().unlock();
+ }
+ addClusterMember(new ClusterMemberImpl(clusterMember));
+ }
+
+ private void removeMemberIdMember(String clusterMember) {
+ m_memberIdMembersLock.writeLock().lock();
+ try {
+ m_memberIdMembers.remove(clusterMember);
+ }
+ finally {
+ m_memberIdMembersLock.writeLock().unlock();
+ }
+ removeClusterMember(clusterMember);
+ }
+
+ private void addFaultyMember(Member member) {
+ m_faultyMembersLock.writeLock().lock();
+ try {
+ m_faultyMembers.add(member);
+ }
+ finally {
+ m_faultyMembersLock.writeLock().unlock();
+ }
+ }
+
+ private boolean removeFaultyMember(Member member) {
+ m_faultyMembersLock.writeLock().lock();
+ try {
+ return m_faultyMembers.remove(member);
+ }
+ finally {
+ m_faultyMembersLock.writeLock().unlock();
+ }
+ }
+
/********************************************************
* Helper classes
********************************************************/
@@ -186,17 +238,24 @@
public void memberAdded(Member member) {
try {
- getLogService().log(LogService.LOG_DEBUG, "Member added: " +
member.toString());
- ClusterMember clusterMember = new
ClusterMemberImpl(getProperties(member.getPayload())
- .getProperty(CLUSTER_CLUSTERMEMBER_PROP));
- m_clusterMemberMembersLock.writeLock().lock();
- try {
- m_clusterMemberMembers.put(clusterMember, member);
- addClusterMember(clusterMember);
+ Properties props = getProperties(member.getPayload());
+ String groupchannel = (String)
props.get(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY);
+ if (groupchannel == null ||
!groupchannel.equals(getClusterId())) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Member joined with invalid channelid! Will ignore it:
" + member.toString());
+ addFaultyMember(member);
+ return;
}
- finally {
- m_clusterMemberMembersLock.writeLock().unlock();
+ String channelmember = (String)
props.get(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY);
+ if (channelmember == null ||
channelmember.equals(getMemberId())
+ || getClusterMember(channelmember) != null) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Member joined with invalid memberid! Will ignore it:
" + member.toString());
+ addFaultyMember(member);
+ return;
}
+ addMemberIdMember(channelmember, member);
+ getLogService().log(LogService.LOG_DEBUG, "Member added: " +
member.toString());
}
catch (Exception e) {
getLogService().log(LogService.LOG_ERROR, "Exception while
adding member: " + member.toString(), e);
@@ -205,27 +264,12 @@
public void memberDisappeared(Member member) {
try {
- // FIXME use memberid to tuple map to reduce object creation
- getLogService().log(LogService.LOG_DEBUG, "Member disappeared:
" + member.toString());
- String memberId = getProperties(member.getPayload())
- .getProperty(CLUSTER_CLUSTERMEMBER_PROP);
- ClusterMember toBeRemoved = null;
- m_clusterMemberMembersLock.writeLock().lock();
- try {
- for (ClusterMember clusterMember :
m_clusterMemberMembers.keySet()) {
- if (clusterMember.getMemberId().equals(memberId)) {
- toBeRemoved = clusterMember;
- break;
- }
- }
- if (toBeRemoved != null) {
- removeClusterMember(toBeRemoved);
- m_clusterMemberMembers.remove(toBeRemoved);
- }
- }
- finally {
- m_clusterMemberMembersLock.writeLock().unlock();
+ if (removeFaultyMember(member)) {
+ getLogService().log(LogService.LOG_DEBUG, "Faulty member
disappeared: " + member.toString());
+ return;
}
+ String memberId =
getProperties(member.getPayload()).getProperty(SERVICE_CLUSTERMEMBER_PROPERTY);
+ removeMemberIdMember(memberId);
}
catch (Exception e) {
getLogService().log(LogService.LOG_ERROR, "Exception while
removing member: " + member.toString(), e);
@@ -236,6 +280,17 @@
class TribesChannelListener implements ChannelListener {
public boolean accept(Serializable message, Member member) {
+ m_faultyMembersLock.readLock().lock();
+ try {
+ if (m_faultyMembers.contains(member)) {
+ getLogService().log(LogService.LOG_WARNING,
+ "Dropping message recieved from faulty member: " +
member.toString());
+ return false;
+ }
+ }
+ finally {
+ m_faultyMembersLock.readLock().unlock();
+ }
return true;
}
@@ -243,23 +298,4 @@
dispatchMessage(message);
}
}
-
- static class ClusterMemberMemberTuple {
-
- private final ClusterMember m_clusterMember;
- private final Member m_member;
-
- public ClusterMemberMemberTuple(final ClusterMember clusterMember,
final Member member) {
- m_clusterMember = clusterMember;
- m_member = member;
- }
-
- public ClusterMember getClusterMember() {
- return m_clusterMember;
- }
-
- public Member getMember() {
- return m_member;
- }
- }
}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
Mon Jan 3 17:18:44 2011
@@ -30,11 +30,17 @@
manager.add(createComponent()
.setInterface(FabricManagerService.class.getName(), null)
- .setImplementation(new FabricManagerServiceImpl())
+ .setFactory(new FabricManagerServiceFactory(), "getInstance")
.add(createConfigurationDependency().setPid(FabricManagerService.CONFIGURATION_PID)));
}
@Override
public void destroy(BundleContext context, DependencyManager manager)
throws Exception {
}
+
+ static class FabricManagerServiceFactory {
+ public FabricManagerService getInstance() {
+ return new FabricManagerServiceImpl();
+ }
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
Mon Jan 3 17:18:44 2011
@@ -29,6 +29,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
import org.apache.felix.dm.Component;
@@ -71,9 +72,9 @@
* Constructors
********************************************************/
- public LocalServiceInvocationHandler(String clusterGroupId, String
serviceGroupId,
- ServiceEndPoint serviceEndpoint,
- Class<?>[] interfaceClasses) {
+ public LocalServiceInvocationHandler(String clusterGroupId, String
serviceGroupId,
+ ServiceEndPoint serviceEndpoint,
+ Class<?>[] interfaceClasses) {
m_clusterGroupId = clusterGroupId;
m_serviceGroupId = serviceGroupId;
m_serviceEndpoint = serviceEndpoint;
@@ -97,16 +98,17 @@
********************************************************/
public synchronized void init() {
- ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
- logServiceDependency.setService(LogService.class);
- logServiceDependency.setRequired(true);
- m_component.add(logServiceDependency);
-
ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
eventAdminServiceDependency.setService(EventAdmin.class);
eventAdminServiceDependency.setRequired(true);
+ eventAdminServiceDependency.setInstanceBound(true);
m_component.add(eventAdminServiceDependency);
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(false);
+ m_component.add(logServiceDependency);
+
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(EventConstants.EVENT_TOPIC,
new String[] {
DistributionUtilities.getLocalResponseTopic(m_clusterGroupId, m_serviceGroupId)
});
@@ -144,7 +146,7 @@
Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
message);
Event event =
- new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
eventPayload);
m_eventAdmin.postEvent(event);
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Mon Jan 3 17:18:44 2011
@@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
@@ -46,6 +47,7 @@
/**
* I keep track of local RemotableServiceEndpoint services and publish them in
the cluster
* I listen to the cluster and publish local RemoteServiceEndpoint services
for them
+ *
* TODO support LOOKUP requests for more fine grained discovery (use
ListenerHook)
*/
public final class DiscoveryServiceImpl implements DiscoveryService {
@@ -81,37 +83,41 @@
********************************************************/
public synchronized void init() {
+
@SuppressWarnings("unchecked")
Dictionary<String, Object> discoveryProps =
m_component.getServiceProperties();
if (discoveryProps == null) {
discoveryProps = new Hashtable<String, Object>();
}
- discoveryProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+
discoveryProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
discoveryProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
m_component.setServiceProperties(discoveryProps);
- ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
- logServiceDependency.setService(LogService.class);
- logServiceDependency.setRequired(true);
- m_component.add(logServiceDependency);
-
ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
eventAdminServiceDependency.setService(EventAdmin.class);
eventAdminServiceDependency.setRequired(true);
+ eventAdminServiceDependency.setInstanceBound(true);
m_component.add(eventAdminServiceDependency);
ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
clusterMemberDependency
.setService(
ClusterMemberService.class,
- "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" +
m_clusterGroupId + ")")
+ "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
.setRequired(true);
+ clusterMemberDependency.setInstanceBound(true);
m_component.add(clusterMemberDependency);
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ logServiceDependency.setInstanceBound(true);
+ m_component.add(logServiceDependency);
+
ServiceDependency remotableServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
remotableServiceEndpointsDependecy
.setService(RemotableServiceEndpoint.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setCallbacks("remotableServiceEndPointAdded",
"remotableServiceEndPointRemoved")
@@ -133,7 +139,7 @@
public synchronized void start() {
m_logService.log(LogService.LOG_INFO, "Starting " + toString());
m_dependencyManager.add(m_discoveryEventHandlerComponent);
- m_evenAdmin.postEvent(createdDiscoveryEvent());
+ m_evenAdmin.postEvent(createDiscoveryEvent());
}
public synchronized void stop() {
@@ -188,12 +194,12 @@
* Private methods
********************************************************/
- private Event createdDiscoveryEvent() {
+ private Event createDiscoveryEvent() {
Dictionary<String, Object> eventProps = new Hashtable<String,
Object>();
eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
Event discoveryEvent =
- new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
eventProps);
return discoveryEvent;
}
@@ -202,7 +208,7 @@
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointPublishMessage(serviceEndPoint));
Event event =
- new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
props);
return event;
}
@@ -211,7 +217,7 @@
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointDepublishMessage(serviceEndPoint));
Event event =
- new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
props);
return event;
}
@@ -269,7 +275,7 @@
return;
}
Dictionary<String, Object> distributionProps = new
Hashtable<String, Object>();
-
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
+
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
@@ -279,7 +285,7 @@
.setImplementation(new
RemoteServiceEndPointImpl(serviceEndPoint));
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
DiscoveryService.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ "(&(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true));
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Mon Jan 3 17:18:44 2011
@@ -26,6 +26,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.DistributionService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
@@ -78,25 +79,26 @@
private final String m_serviceGroupId;
/********************************************************
- * Constructors
+ * Constructor methods
********************************************************/
- public DistributionServiceImpl(String clusterGroupId, String
serviceGroupId) {
+ public DistributionServiceImpl(String clusterGroupId, String
serviceGroupId) {
m_clusterGroupId = clusterGroupId;
m_serviceGroupId = serviceGroupId;
}
/********************************************************
- * Service lifecycle
+ * Lifecycle methods
********************************************************/
public synchronized void init() {
+
@SuppressWarnings("unchecked")
Dictionary<String, Object> distributionProps =
m_component.getServiceProperties();
if (distributionProps == null) {
distributionProps = new Hashtable<String, Object>();
}
- distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
distributionProps.put(DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED_PROP,
DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED);
@@ -104,24 +106,27 @@
DistributionService.DISTRIBUTION_INTENTS_SUPPORTED);
m_component.setServiceProperties(distributionProps);
- ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
- logServiceDependency.setService(LogService.class);
- logServiceDependency.setRequired(true);
- m_component.add(logServiceDependency);
-
ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
eventAdminServiceDependency.setService(EventAdmin.class);
eventAdminServiceDependency.setRequired(true);
+ eventAdminServiceDependency.setInstanceBound(true);
m_component.add(eventAdminServiceDependency);
ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
clusterMemberDependency
.setService(
ClusterMemberService.class,
- "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" +
m_clusterGroupId + ")")
+ "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
.setRequired(true);
+ clusterMemberDependency.setInstanceBound(true);
m_component.add(clusterMemberDependency);
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ logServiceDependency.setInstanceBound(true);
+ m_component.add(logServiceDependency);
+
ServiceDependency remotableServicesDependecy =
m_dependencyManager.createServiceDependency();
remotableServicesDependecy
.setService(
@@ -135,7 +140,7 @@
ServiceDependency remoteServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
remoteServiceEndpointsDependecy
.setService(RemoteServiceEndPoint.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setCallbacks("remoteServiceEndPointAdded",
"remoteServiceEndPointRemoved")
@@ -172,6 +177,12 @@
public void localRemotableServiceAdded(final ServiceReference
serviceReference /* , Object Service */) {
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
+ if (!isServiceEndpointForServiceGroup(serviceEndPoint)) {
+ m_logService
+ .log(LogService.LOG_DEBUG,
+ "Ignoring ServiceEndpoint not for this serviceGroup: " +
serviceEndPoint.toString());
+ return;
+ }
if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
m_logService
.log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint
configuration " + serviceEndPoint.toString());
@@ -195,6 +206,12 @@
public void localRemotableServiceRemoved(ServiceReference serviceReference
/* , Object Service */) {
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
+ if (!isServiceEndpointForServiceGroup(serviceEndPoint)) {
+ m_logService
+ .log(LogService.LOG_DEBUG,
+ "Ignoring ServiceEndpoint not for this serviceGroup: " +
serviceEndPoint.toString());
+ return;
+ }
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
try {
if
(m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint))
{
@@ -296,7 +313,7 @@
private Component createRemotableEndPointComponent(final ServiceEndPoint
serviceEndPoint) {
Dictionary<String, Object> distributionProps = new Hashtable<String,
Object>();
- distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
Component serviceComponent =
m_dependencyManager.createComponent()
@@ -307,7 +324,7 @@
.createServiceDependency()
.setService(
DistributionService.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP +
"="
+ "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true);
@@ -344,7 +361,7 @@
props.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
- props.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+ props.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
props.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
props.put(DistributionService.SERVICE_IMPORTED_PROP, "true");
props.put(DistributionService.SERVICE_INTENTS_PROP, importedIntents);
@@ -358,7 +375,7 @@
.createServiceDependency()
.setService(
DistributionService.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP +
"="
+ "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true);
@@ -366,6 +383,29 @@
return serviceComponent;
}
+ private boolean isServiceEndpointForServiceGroup(ServiceEndPoint
serviceEndpoint) {
+ Object serviceGroupProperty =
serviceEndpoint.getProperties().get(DiscoveryService.REMOTE_SERVICEGROUPID_PROP);
+ if (serviceGroupProperty == null) {
+ return true;
+ }
+ if (serviceGroupProperty instanceof String) {
+ String serviceGroupValue = (String) serviceGroupProperty;
+ if ("".equals(serviceGroupValue) ||
serviceGroupValue.equals(m_serviceGroupId)) {
+ return true;
+ }
+ return false;
+ }
+ if (serviceGroupProperty instanceof String[]) {
+ String[] serviceGroupValues = (String[]) serviceGroupProperty;
+ for (String serviceGroupValue : serviceGroupValues) {
+ if (serviceGroupValue.equals(m_serviceGroupId)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint
serviceEndpoint) {
if
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndpoint.getProperties()))
{
System.err.println("No supported configuration type");
@@ -405,7 +445,7 @@
private void recieveEndpointInvokeMessage(EndpointInvokeMessage
endpointInvokeMessage) {
ServiceEndPoint serviceEndPoint =
endpointInvokeMessage.getServiceEndPoint();
- //FIXME address this
+ // FIXME address this
serviceEndPoint.setMemberId(null);
Object serviceResponse = null;
@@ -493,7 +533,7 @@
originMemberId, originServiceGroup, responsePayload));
Event responseEvent =
- new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
eventPayload);
return responseEvent;
}
@@ -516,6 +556,7 @@
}
static class ServiceReferenceComponentTuple {
+
private final ServiceReference m_serviceReference;
private final Component m_component;
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Mon Jan 3 17:18:44 2011
@@ -24,6 +24,7 @@
import org.amdatu.core.fabric.FabricManagerService;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import
org.amdatu.core.fabric.cluster.service.tribes.TribesClusterMemberServiceImpl;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.DistributionService;
@@ -35,7 +36,7 @@
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.log.LogService;
-public class FabricManagerServiceImpl implements FabricManagerService {
+public final class FabricManagerServiceImpl implements FabricManagerService {
private final Map<String, Component> m_clusterMemberComponents = new
HashMap<String, Component>();
private final ReentrantReadWriteLock m_clusterMemberComponentsLock = new
ReentrantReadWriteLock();
@@ -51,13 +52,21 @@
private volatile LogService m_logService;
/********************************************************
- * Service lifecycle
+ * Constructor methods
+ ********************************************************/
+
+ public FabricManagerServiceImpl() {
+ }
+
+ /********************************************************
+ * Lifecycle methods
********************************************************/
public final synchronized void init() {
ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
logServiceDependency.setService(LogService.class);
logServiceDependency.setRequired(true);
+ logServiceDependency.setInstanceBound(true);
m_component.add(logServiceDependency);
}
@@ -73,7 +82,7 @@
}
/********************************************************
- * ManagedService (controlled by dependencymanager)
+ * ManagedService methods
********************************************************/
public synchronized void updated(Dictionary<String, Object> dictionary)
throws ConfigurationException {
@@ -113,27 +122,33 @@
}
/********************************************************
- * FabricManagerService interface
+ * FabricManagerService methods
********************************************************/
public boolean createClusterMember(String clusterGroupId, String
clusterMemberId,
Dictionary<String, Object> properties) {
- if (clusterMemberId == null || "".equals(clusterMemberId)) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to create cluster.
Invalid clustername: " + clusterGroupId);
+ return false;
+ }
+ if (!ClusterMemberUtilities.isValidMemberName(clusterMemberId)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to create cluster.
Invalid membername: " + clusterMemberId);
return false;
}
Dictionary<String, Object> svcProperties = new Hashtable<String,
Object>();
- svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
clusterMemberId);
- svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP,
clusterMemberId);
+// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
clusterMemberId);
+// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
clusterMemberId);
Component clusterMemberComponent =
- m_dependencyManager.createComponent()
+ m_dependencyManager
+ .createComponent()
.setInterface(ClusterMemberService.class.getName(), properties)
- .setImplementation(new
TribesClusterMemberServiceImpl(clusterGroupId, clusterMemberId, properties))
+ .setFactory(new ClusterMemberServiceFactory(clusterGroupId,
clusterMemberId, properties), "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
- .setRequired(true));
+ .setRequired(true).setInstanceBound(true));
m_clusterMemberComponentsLock.writeLock().lock();
try {
@@ -177,11 +192,11 @@
.createComponent()
.setInterface(DiscoveryService.class.getName(),
null)
- .setImplementation(new DiscoveryServiceImpl(clusterGroupId,
serviceGroupId))
+ .setFactory(new DiscoveryServiceFactory(clusterGroupId,
serviceGroupId), "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
- .setRequired(true));
+ .setRequired(true).setInstanceBound(true));
m_discoveryComponentsLock.writeLock().lock();
try {
@@ -222,11 +237,11 @@
m_dependencyManager
.createComponent()
.setInterface(DistributionService.class.getName(), null)
- .setImplementation(new DistributionServiceImpl(clusterGroupId,
serviceGroupId))
+ .setFactory(new DistributionServiceFactory(clusterGroupId,
serviceGroupId), "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
- .setRequired(true));
+ .setRequired(true).setInstanceBound(true));
m_distributionComponentsLock.writeLock().lock();
try {
@@ -245,7 +260,7 @@
}
public boolean removeDistribution(String clusterGroupId, String
serviceGroupId) {
- // FIXME improve checks
+ // TODO improve checks
m_distributionComponentsLock.writeLock().lock();
try {
@@ -261,7 +276,63 @@
}
}
+ /********************************************************
+ * Private methods
+ ********************************************************/
+
private String getKey(String clusterGroupId, String clusterMemberId,
String serviceGroupId) {
return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
}
+
+ /********************************************************
+ * Helper classes
+ ********************************************************/
+
+ static class ClusterMemberServiceFactory {
+
+ private final String m_clusterGroupId;
+ private final String m_clusterMemberId;
+ private final Dictionary<String, Object> m_properties;
+
+ public ClusterMemberServiceFactory(String clusterGroupId, String
clusterMemberId,
+ Dictionary<String, Object> properties) {
+ m_clusterGroupId = clusterGroupId;
+ m_clusterMemberId = clusterMemberId;
+ m_properties = properties;
+ }
+
+ public ClusterMemberService getInstance() {
+ return new TribesClusterMemberServiceImpl(m_clusterGroupId,
m_clusterMemberId, m_properties);
+ }
+ }
+
+ static class DiscoveryServiceFactory {
+
+ private final String m_clusterGroupId;
+ private final String m_serviceGroupId;
+
+ public DiscoveryServiceFactory(String clusterGroupId, String
serviceGroupId) {
+ m_clusterGroupId = clusterGroupId;
+ m_serviceGroupId = serviceGroupId;
+ }
+
+ public DiscoveryService getInstance() {
+ return new DiscoveryServiceImpl(m_clusterGroupId,
m_serviceGroupId);
+ }
+ }
+
+ static class DistributionServiceFactory {
+
+ private final String m_clusterGroupId;
+ private final String m_serviceGroupId;
+
+ public DistributionServiceFactory(String clusterGroupId, String
serviceGroupId) {
+ m_clusterGroupId = clusterGroupId;
+ m_serviceGroupId = serviceGroupId;
+ }
+
+ public DistributionService getInstance() {
+ return new DistributionServiceImpl(m_clusterGroupId,
m_serviceGroupId);
+ }
+ }
}
Added:
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
Mon Jan 3 17:18:44 2011
@@ -0,0 +1,43 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.remote.service;
+
+import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ChannelMemberUtilitiesTest {
+
+ @Test
+ public void testIsValidChannelName() {
+ Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("a"));
+ Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello"));
+
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_world"));
+
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello-world"));
+
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_123"));
+
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("123hello_"));
+
+ Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(null));
+ Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(""));
+ Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(" "));
+ Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("
hello"));
+ Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello
"));
+
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello.world"));
+
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello/world"));
+ }
+
+}
Modified:
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
(original)
+++
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
Mon Jan 3 17:18:44 2011
@@ -1,3 +1,19 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
package org.amdatu.core.fabric.remote.service;
import java.util.Dictionary;