Author: bdekruijff at gmail.com
Date: Fri Dec 31 17:13:10 2010
New Revision: 553
Log:
[sandbox] Major fabric refactor changing to whiteboard/eventadmin model
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
- copied, changed from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
- copied, changed from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
- copied, changed from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Removed:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageListener.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterTopicListener.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/TopicMessageWrapper.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
Modified:
sandbox/bdekruijff/fabric/pom.xml
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Modified: sandbox/bdekruijff/fabric/pom.xml
==============================================================================
--- sandbox/bdekruijff/fabric/pom.xml (original)
+++ sandbox/bdekruijff/fabric/pom.xml Fri Dec 31 17:13:10 2010
@@ -36,7 +36,7 @@
<Bundle-Activator>org.amdatu.core.fabric.osgi.Activator</Bundle-Activator>
<Bundle-SymbolicName>org.amdatu.core.fabric</Bundle-SymbolicName>
<DynamicImport-Package>*</DynamicImport-Package>
- <Export-Package>org.amdatu.core.fabric.cluster,
org.amdatu.core.fabric.remote</Export-Package>
+ <Export-Package>org.amdatu.core.fabric,
org.amdatu.core.fabric.cluster, org.amdatu.core.fabric.remote</Export-Package>
<Embed-Dependency>*;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
</instructions>
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
Fri Dec 31 17:13:10 2010
@@ -0,0 +1,21 @@
+package org.amdatu.core.fabric;
+
+import java.util.Dictionary;
+
+public interface FabricManagerService {
+
+ String CONFIGURATION_PID = "org.amdatu.core.fabric";
+
+ boolean createClusterMember(String clusterGroupId, String clusterMemberId,
Dictionary<String, Object> properties);
+
+ boolean removeClusterMember(String clusterGroupId, String clusterMemberId);
+
+ boolean createDiscovery(String clusterGroupId, String serviceGroupId);
+
+ boolean removeDiscovery(String clusterGroupId, String serviceGroupId);
+
+ boolean createDistribution(String clusterGroupId, String serviceGroupId);
+
+ boolean removeDistribution(String clusterGroupId, String serviceGroupId);
+
+}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
Fri Dec 31 17:13:10 2010
@@ -18,5 +18,5 @@
public interface ClusterMember {
- String getId();
+ String getMemberId();
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
Fri Dec 31 17:13:10 2010
@@ -16,28 +16,11 @@
*/
package org.amdatu.core.fabric.cluster;
-import java.util.Dictionary;
-
public interface ClusterMemberService {
- String CLUSTER_CLUSTERID_PROP = "org.amdatu.fabric.cluster.CLUSTERID";
- String CLUSTER_MEMBERID_PROP = "org.amdatu.fabric.cluster.MEMBERID";
-
- String getClusterId();
-
- String getMemberId();
-
- Dictionary<String, Object> getProperties();
-
- ClusterMember[] getClusterMembers();
-
- ClusterMember getClusterMember(String memberId);
-
- void broadcast(Object message);
-
- void send(ClusterMember[] clusterMembers, Object message);
-
- void subscribe(ClusterMessageListener clusterMessageListener);
+ String CLUSTER_CLUSTERGROUP_PROP =
"org.amdatu.fabric.cluster.CLUSTERGROUP";
+ String CLUSTER_CLUSTERMEMBER_PROP =
"org.amdatu.fabric.cluster.CLUSTERMEMBER";
- void unsubscribe(ClusterMessageListener clusterMessageListener);
+ String EVENT_TOPIC_BROADCAST = "org/amdatu/fabric/cluster/BROADCAST";
+ String EVENT_MESSAGE_PROPERTY = "org/amdatu/fabric/cluster/MESSAGE";
}
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/LocalTopicMessage.java
Fri Dec 31 17:13:10 2010
@@ -0,0 +1,25 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster;
+
+/**
+ * Interface for messages that want to specify the EventAdmin topic that the
+ * ClusterMemberService should post them on at the receiver end.
+ */
+public interface LocalTopicMessage {
+ String getLocalTopic();
+}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
(from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
Fri Dec 31 17:13:10 2010
@@ -1,27 +1,83 @@
-package org.amdatu.core.fabric.cluster.internal;
+package org.amdatu.core.fabric.cluster;
import java.io.Serializable;
-public abstract class RoutedMessage implements Serializable {
-
- private final String m_targetClusterId;
- private final String[] m_targetMemberIds;
+/**
+ * Base class for messages that want to enable routing support. Target
information may
+ * be set to avoid a cluster wide broadcast. The origin information will be
injected by
+ * the ClusterMemberService upon broadcast/send.
+ *
+ * FIXME ServiceGroup is not a cluster concept
+ */
+public abstract class RoutableMessage implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String m_originClusterId;
+ private String m_originMemberId;
+ private String m_originServiceGroup;
+
+ private String m_targetClusterId;
+ private String m_targetMemberId;
+ private String m_targetServiceGroup;
+
+ public RoutableMessage(final String serviceGroup) {
+ m_originServiceGroup = serviceGroup;
+ m_targetServiceGroup = serviceGroup;
+ }
- public RoutedMessage(final String clusterId, final String memberId) {
+ public RoutableMessage(final String clusterId, final String memberId,
final String serviceGroup) {
m_targetClusterId = clusterId;
- m_targetMemberIds = new String[] { memberId };
+ m_targetServiceGroup = serviceGroup;
+ m_targetMemberId = memberId;
}
- public RoutedMessage(final String clusterId, String[] memberIds) {
- m_targetClusterId = clusterId;
- m_targetMemberIds = memberIds;
+ public final String getOriginClusterId() {
+ return m_originClusterId;
+ }
+
+ public final void setOriginClusterId(String clusterId) {
+ m_originClusterId = clusterId;
+ }
+
+ public final String getOriginMemberId() {
+ return m_originMemberId;
+ }
+
+ public final void setOriginMemberId(String memberId) {
+ m_originMemberId = memberId;
}
- public final String getClusterId() {
+ public final String getOriginServiceGroup() {
+ return m_originServiceGroup;
+ }
+
+ public final void setOriginServcieGroup(String serviceGroup) {
+ m_originServiceGroup = serviceGroup;
+ }
+
+ public final String getTargetClusterId() {
return m_targetClusterId;
}
- public final String[] getMemberIds() {
- return m_targetMemberIds;
+ public final void setTargetClusterId(String clusterId) {
+ m_targetClusterId = clusterId;
+ }
+
+ public final String getTargetMemberId() {
+ return m_targetMemberId;
+ }
+
+ public final void setTargetMemberId(String memberId) {
+ m_targetMemberId = memberId;
}
+
+ public final String getTargetServiceGroup() {
+ return m_targetServiceGroup;
+ }
+
+ public final void setTargetServiceGroup(String serviceGroup) {
+ m_targetServiceGroup = serviceGroup;
+ }
+
}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
Fri Dec 31 17:13:10 2010
@@ -20,22 +20,22 @@
public class ClusterMemberImpl implements ClusterMember {
- public final String m_id;
+ public final String m_memberId;
/********************************************************
* Constructors
********************************************************/
- public ClusterMemberImpl(final String id) {
- m_id = id;
+ public ClusterMemberImpl(final String memberId) {
+ m_memberId = memberId;
}
/********************************************************
* ClusterMember
********************************************************/
- public String getId() {
- return m_id;
+ public String getMemberId() {
+ return m_memberId;
}
/********************************************************
@@ -44,11 +44,11 @@
@Override
public boolean equals(Object obj) {
- return m_id.equals(((ClusterMemberImpl) obj).getId());
+ return m_memberId.equals(((ClusterMemberImpl) obj).getMemberId());
}
@Override
public int hashCode() {
- return m_id.hashCode();
+ return m_memberId.hashCode();
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
Fri Dec 31 17:13:10 2010
@@ -62,6 +62,10 @@
}
public static Channel createChannel(String[] args) throws Exception {
+
+ // amdatu
+ byte[] domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
String bind = "auto";
int port = 4001;
String mbind = null;
@@ -169,6 +173,12 @@
else if ("-mbind".equals(args[i])) {
mbind = args[++i];
}
+ else if ("-domain".equals(args[i])) {
+ String dom = args[++i];
+ if (dom.equals("amdatu")) {
+ domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 1 };
+ }
+ }
}
System.out.println("Creating receiver class=" + receiver);
@@ -248,7 +258,6 @@
channel.addInterceptor(smi);
}
- byte[] domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
((McastService) channel.getMembershipService()).setDomain(domain);
DomainFilterInterceptor filter = new DomainFilterInterceptor();
filter.setDomain(domain);
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
(from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/BaseClusterMemberService.java
Fri Dec 31 17:13:10 2010
@@ -19,42 +19,51 @@
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageListener;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.osgi.service.log.LogService;
/**
* I manage cluster state
*/
-public abstract class AbstractClusterMemberService implements
ClusterMemberService {
+public abstract class BaseClusterMemberService implements ClusterMemberService
{
private final Map<String, ClusterMember> m_clusterMembers = new
HashMap<String, ClusterMember>();
- private final Set<ClusterMessageListener> m_clusterMessageListeners = new
HashSet<ClusterMessageListener>();
-
private final ReentrantReadWriteLock m_clusterMembersLock = new
ReentrantReadWriteLock();
- private final ReentrantReadWriteLock m_clusterMessageListenersLock = new
ReentrantReadWriteLock();
-
- private final ExecutorService m_executorService =
Executors.newFixedThreadPool(1);
private final String m_clusterId;
private final String m_memberId;
private final Map<String, Object> m_properties;
+ // injected
+ private volatile DependencyManager m_dependencyManager;
+ private volatile Component m_component;
+ private volatile EventAdmin m_eventAdmin;
+ private volatile LogService m_logService;
+
+ private volatile Component m_broadcastEventHandlerComponent;
+
/********************************************************
* Constructors
********************************************************/
- public AbstractClusterMemberService(String clusterId, String memberId,
Dictionary<String, Object> properties) {
- m_clusterId = clusterId;
- m_memberId = memberId;
+ public BaseClusterMemberService(String clusterGroupId, String
clusterMemberId,
+ Dictionary<String, Object> properties) {
+ m_clusterId = clusterGroupId;
+ m_memberId = clusterMemberId;
m_properties = new HashMap<String, Object>();
if (properties != null) {
Enumeration<String> enumeration = properties.keys();
@@ -66,6 +75,54 @@
}
/********************************************************
+ * Service lifecycle
+ ********************************************************/
+
+ public final synchronized void init() {
+ @SuppressWarnings("unchecked")
+ Dictionary<String, Object> serviceProps =
m_component.getServiceProperties();
+ if (serviceProps == null) {
+ serviceProps = new Hashtable<String, Object>();
+ }
+ serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterId);
+ serviceProps.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP,
m_memberId);
+ m_component.setServiceProperties(serviceProps);
+
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ m_component.add(logServiceDependency);
+
+ ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
+ eventAdminServiceDependency.setService(EventAdmin.class);
+ eventAdminServiceDependency.setRequired(true);
+ m_component.add(eventAdminServiceDependency);
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventConstants.EVENT_TOPIC, new String[] {
EVENT_TOPIC_BROADCAST + "/" + m_clusterId });
+ m_broadcastEventHandlerComponent =
m_dependencyManager.createComponent();
+
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
+ m_broadcastEventHandlerComponent.setImplementation(new
BroadcastEventHandler());
+ onInit();
+ }
+
+ public final synchronized void destroy() {
+ onDestroy();
+ }
+
+ public final synchronized void start() {
+ m_logService.log(LogService.LOG_WARNING, "Starting
ClusterMemberService");
+ m_dependencyManager.add(m_broadcastEventHandlerComponent);
+ onStart();
+ }
+
+ public final synchronized void stop() {
+ m_logService.log(LogService.LOG_WARNING, "Stopping
ClusterMemberService");
+ m_dependencyManager.remove(m_broadcastEventHandlerComponent);
+ onStop();
+ }
+
+ /********************************************************
* ClusterMemberService
********************************************************/
@@ -102,44 +159,18 @@
}
}
- public final void broadcast(Object message) {
- m_executorService.submit(new BroadcastRunnable(message));
- }
-
- public final void send(ClusterMember[] clusterMembers, Object message) {
- m_executorService.submit(new SendRunnable(clusterMembers, message));
- }
-
- public final void subscribe(ClusterMessageListener clusterMessageListener)
{
- m_clusterMessageListenersLock.writeLock().lock();
- try {
- m_clusterMessageListeners.add(clusterMessageListener);
- }
- finally {
- m_clusterMessageListenersLock.writeLock().unlock();
- }
- onSubscribe(clusterMessageListener);
- }
-
- public final void unsubscribe(ClusterMessageListener
clusterMessageListener) {
- m_clusterMessageListenersLock.writeLock().lock();
- try {
- m_clusterMessageListeners.remove(clusterMessageListener);
- }
- finally {
- m_clusterMessageListenersLock.writeLock().unlock();
- }
- onUnsubscribe(clusterMessageListener);
- }
-
/********************************************************
* for implementing concrete classes
********************************************************/
+ protected final LogService getLogService() {
+ return m_logService;
+ }
+
protected final void addClusterMember(ClusterMember clusterMember) {
m_clusterMembersLock.writeLock().lock();
try {
- m_clusterMembers.put(clusterMember.getId(), clusterMember);
+ m_clusterMembers.put(clusterMember.getMemberId(), clusterMember);
}
finally {
m_clusterMembersLock.writeLock().unlock();
@@ -149,7 +180,7 @@
protected final void removeClusterMember(ClusterMember clusterMember) {
m_clusterMembersLock.writeLock().lock();
try {
- m_clusterMembers.remove(clusterMember.getId());
+ m_clusterMembers.remove(clusterMember.getMemberId());
}
finally {
m_clusterMembersLock.writeLock().unlock();
@@ -157,71 +188,74 @@
}
protected final void dispatchMessage(Object message) {
- m_clusterMessageListenersLock.readLock().lock();
- try {
- for (ClusterMessageListener clm : m_clusterMessageListeners) {
- m_executorService.submit(new MessageDispatchRunnable(clm,
message));
- }
- }
- finally {
- m_clusterMessageListenersLock.readLock().unlock();
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EVENT_MESSAGE_PROPERTY, message);
+ // FIXME DO NOT REBROADCAST
+ String topic = EVENT_TOPIC_BROADCAST;
+ if (message instanceof LocalTopicMessage) {
+ topic = ((LocalTopicMessage) message).getLocalTopic();
}
+ Event broadCastEvent = new Event(topic, props);
+ m_eventAdmin.postEvent(broadCastEvent);
}
- protected void onSubscribe(ClusterMessageListener clusterMessageListener) {
- }
+ protected abstract void onInit();
- protected void onUnsubscribe(ClusterMessageListener
clusterMessageListener) {
- }
+ protected abstract void onDestroy();
+
+ protected abstract void onStart();
- public abstract void doBroadcast(Object message);
+ protected abstract void onStop();
- public abstract void doSend(ClusterMember[] clusterMember, Object message);
+ protected abstract void doBroadcast(Object message);
+
+ protected abstract void doSend(ClusterMember[] clusterMember, Object
message);
/********************************************************
* helper classes
********************************************************/
- class BroadcastRunnable implements Runnable {
-
- private final Object m_message;
-
- public BroadcastRunnable(final Object message) {
- m_message = message;
- }
-
- public void run() {
- doBroadcast(m_message);
- }
- }
+ class BroadcastEventHandler implements EventHandler {
- class SendRunnable implements Runnable {
-
- private final ClusterMember[] m_clusterMember;
- private final Object m_message;
-
- public SendRunnable(final ClusterMember[] clusterMembers, final Object
message) {
- m_clusterMember = clusterMembers;
- m_message = message;
- }
-
- public void run() {
- doSend(m_clusterMember, m_message);
- }
- }
-
- static class MessageDispatchRunnable implements Runnable {
-
- private final ClusterMessageListener m_clusterMessageListener;
- private final Object m_message;
-
- public MessageDispatchRunnable(final ClusterMessageListener
clusterMessageListener, final Object message) {
- m_clusterMessageListener = clusterMessageListener;
- m_message = message;
- }
-
- public void run() {
- m_clusterMessageListener.recieveMessage(m_message);
+ public void handleEvent(Event event) {
+ Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
+ if (message instanceof RoutableMessage) {
+
+ RoutableMessage routableMessage = (RoutableMessage) message;
+ routableMessage.setOriginClusterId(m_clusterId);
+ routableMessage.setOriginMemberId(m_memberId);
+
+ if (routableMessage.getTargetClusterId() == null) {
+ routableMessage.setTargetClusterId(m_clusterId);
+ }
+ else {
+ // FIXME address this
+ if
(!routableMessage.getTargetClusterId().equals(getClusterId())) {
+ m_logService.log(LogService.LOG_ERROR,
RoutableMessage.class.getSimpleName()
+ + " is not for this cluster: " +
routableMessage.getTargetClusterId());
+ }
+ }
+
+ ClusterMember clusterMember = null;
+ if (routableMessage.getTargetMemberId() != null) {
+ clusterMember =
getClusterMember(routableMessage.getTargetMemberId());
+ if (clusterMember == null) {
+ // FIXME address this
+ m_logService.log(LogService.LOG_ERROR, "RoutedMessage
specifies unknown target member: "
+ + routableMessage.getTargetMemberId());
+ }
+ }
+
+ if (clusterMember != null) {
+ doSend(new ClusterMember[] { clusterMember }, message);
+ }
+ else {
+ doBroadcast(message);
+ }
+ }
+ else {
+ doBroadcast(message);
+ }
}
}
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
(from r534,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
Fri Dec 31 17:13:10 2010
@@ -31,93 +31,110 @@
import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
-import org.amdatu.core.fabric.cluster.service.AbstractClusterMemberService;
+import org.amdatu.core.fabric.cluster.service.BaseClusterMemberService;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
+import org.osgi.service.log.LogService;
-public final class ClusterMemberServiceImpl extends
AbstractClusterMemberService implements MembershipListener,
- ChannelListener {
+public final class TribesClusterMemberServiceImpl extends
BaseClusterMemberService {
public static final String CLUSTER_TRIBES_ARGS_PROP =
"org.amdatu.fabric.cluster.tribes.args";
private final Map<ClusterMember, Member> m_clusterMemberMembers = new
HashMap<ClusterMember, Member>();
-
private final ReentrantReadWriteLock m_clusterMemberMembersLock = new
ReentrantReadWriteLock();
- private ManagedChannel m_managedChannel;
+ private volatile ManagedChannel m_managedChannel;
/********************************************************
* Constructors
********************************************************/
- public ClusterMemberServiceImpl(String clusterId, String memberId,
Dictionary<String, Object> properties) {
- super(clusterId, memberId, properties);
+ public TribesClusterMemberServiceImpl(String clusterGroupId, String
clusterMemberId,
+ Dictionary<String, Object> properties) {
+ super(clusterGroupId, clusterMemberId, properties);
}
/********************************************************
* Service lifecycle
********************************************************/
- public void start() {
+ protected synchronized void onInit() {
+ }
+
+ protected synchronized void onDestroy() {
+ }
+
+ protected synchronized void onStart() {
try {
+ getLogService().log(LogService.LOG_DEBUG, "Starting managed
channel");
m_managedChannel =
(ManagedChannel) ChannelCreator.createChannel((String[])
getProperties().get(
CLUSTER_TRIBES_ARGS_PROP));
Properties props = new Properties();
- props.setProperty(CLUSTER_MEMBERID_PROP, getMemberId());
-
- m_managedChannel.addMembershipListener(this);
- m_managedChannel.addChannelListener(this);
+ props.setProperty(CLUSTER_CLUSTERMEMBER_PROP, getMemberId());
+ m_managedChannel.addMembershipListener(new
TribesMembershipListener());
+ m_managedChannel.addChannelListener(new TribesChannelListener());
m_managedChannel.getMembershipService().setPayload(getPayload(props));
-
m_managedChannel.start(Channel.DEFAULT);
}
catch (Exception e) {
- e.printStackTrace();
+ getLogService().log(LogService.LOG_ERROR, "Exception while
starting managed channel", e);
}
}
- public void stop() {
+ protected synchronized void onStop() {
try {
+ getLogService().log(LogService.LOG_DEBUG, "Stopping managed
channel");
m_managedChannel.stop(Channel.DEFAULT);
}
- catch (Exception x) {
- x.printStackTrace();
+ catch (Exception e) {
+ getLogService().log(LogService.LOG_ERROR, "Exception while
stopping managed channel", e);
}
}
/********************************************************
- * ClusterMemberService
+ * ClusterMemberService interface
********************************************************/
@Override
public void doBroadcast(Object message) {
- // TODO check and wrap message. Look into send options
if (message instanceof Serializable) {
try {
Member[] members = m_managedChannel.getMembers();
- if (members.length > 0)
+ if (members.length > 0) {
m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
+ else {
+ getLogService().log(
+ LogService.LOG_WARNING,
+ "Dropping message during broadcast because there are
no members on my channel: "
+ + message.toString());
+ }
}
catch (ChannelException e) {
- e.printStackTrace();
+ getLogService().log(LogService.LOG_ERROR, "Exception during
send on managed channel", e);
}
}
+ else {
+ getLogService().log(LogService.LOG_ERROR,
+ "Dropping message of type " + message.getClass().getName() + "
because it is not Serializable: "
+ + message.toString());
+ }
}
@Override
public void doSend(ClusterMember[] clusterMembers, Object message) {
- // TODO check and wrap message. Look into send options
if (message instanceof Serializable) {
try {
List<Member> members = new LinkedList<Member>();
- synchronized (m_clusterMemberMembers) {
+ m_clusterMemberMembersLock.readLock().lock();
+ try {
for (ClusterMember clusterMember : clusterMembers) {
Member member =
m_clusterMemberMembers.get(clusterMember);
if (member != null) {
@@ -125,93 +142,106 @@
}
}
}
- if (members.size() > 0)
+ finally {
+ m_clusterMemberMembersLock.readLock().unlock();
+ }
+ if (members.size() > 0) {
m_managedChannel.send(members.toArray(new
Member[members.size()]), (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
+ else {
+ getLogService().log(LogService.LOG_WARNING,
+ "Dropping message during send because there are no
matching members on my channel: "
+ + message.toString());
+ }
}
catch (ChannelException e) {
- e.printStackTrace();
+ getLogService().log(LogService.LOG_ERROR, "Exception during
send on managed channel", e);
}
}
}
/********************************************************
- * MembershipListener
+ * Utility methods
********************************************************/
- public void memberAdded(Member member) {
- try {
- ClusterMember clusterMember = new
ClusterMemberImpl(getProperties(member.getPayload())
- .getProperty(CLUSTER_MEMBERID_PROP));
- m_clusterMemberMembersLock.writeLock().lock();
+ private byte[] getPayload(Properties props) throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ props.store(bout, "");
+ return bout.toByteArray();
+ }
+
+ private Properties getProperties(byte[] payload) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(payload);
+ Properties props = new Properties();
+ props.load(bin);
+ return props;
+ }
+
+ /********************************************************
+ * Helper classes
+ ********************************************************/
+
+ class TribesMembershipListener implements MembershipListener {
+
+ public void memberAdded(Member member) {
try {
- m_clusterMemberMembers.put(clusterMember, member);
+ getLogService().log(LogService.LOG_DEBUG, "Member added: " +
member.toString());
+ ClusterMember clusterMember = new
ClusterMemberImpl(getProperties(member.getPayload())
+ .getProperty(CLUSTER_CLUSTERMEMBER_PROP));
+ m_clusterMemberMembersLock.writeLock().lock();
+ try {
+ m_clusterMemberMembers.put(clusterMember, member);
+ addClusterMember(clusterMember);
+ }
+ finally {
+ m_clusterMemberMembersLock.writeLock().unlock();
+ }
}
- finally {
- m_clusterMemberMembersLock.writeLock().unlock();
+ catch (Exception e) {
+ getLogService().log(LogService.LOG_ERROR, "Exception while
adding member: " + member.toString(), e);
}
- addClusterMember(clusterMember);
- }
- catch (Exception x) {
- x.printStackTrace();
}
- }
- public void memberDisappeared(Member member) {
- try {
- // FIXME use memberid to tuple map to reduce object creation
- String memberId = getProperties(member.getPayload())
- .getProperty(CLUSTER_MEMBERID_PROP);
- ClusterMember toBeRemoved = null;
- m_clusterMemberMembersLock.writeLock().lock();
+ public void memberDisappeared(Member member) {
try {
- for (ClusterMember clusterMember :
m_clusterMemberMembers.keySet()) {
- if (clusterMember.getId().equals(memberId)) {
- toBeRemoved = clusterMember;
- break;
+ // FIXME use memberid to tuple map to reduce object creation
+ getLogService().log(LogService.LOG_DEBUG, "Member disappeared:
" + member.toString());
+ String memberId = getProperties(member.getPayload())
+ .getProperty(CLUSTER_CLUSTERMEMBER_PROP);
+ ClusterMember toBeRemoved = null;
+ m_clusterMemberMembersLock.writeLock().lock();
+ try {
+ for (ClusterMember clusterMember :
m_clusterMemberMembers.keySet()) {
+ if (clusterMember.getMemberId().equals(memberId)) {
+ toBeRemoved = clusterMember;
+ break;
+ }
+ }
+ if (toBeRemoved != null) {
+ removeClusterMember(toBeRemoved);
+ m_clusterMemberMembers.remove(toBeRemoved);
}
}
- if (toBeRemoved != null) {
- m_clusterMemberMembers.remove(toBeRemoved);
+ finally {
+ m_clusterMemberMembersLock.writeLock().unlock();
}
}
- finally {
- m_clusterMemberMembersLock.writeLock().unlock();
+ catch (Exception e) {
+ getLogService().log(LogService.LOG_ERROR, "Exception while
removing member: " + member.toString(), e);
}
- removeClusterMember(toBeRemoved);
}
- catch (Exception x) {
- x.printStackTrace();
- }
- }
-
- /********************************************************
- * ChannelListener
- ********************************************************/
-
- public boolean accept(Serializable message, Member arg1) {
- return true;
- }
-
- public void messageReceived(Serializable message, Member member) {
- dispatchMessage(message);
}
- /********************************************************
- * private
- ********************************************************/
+ class TribesChannelListener implements ChannelListener {
- private byte[] getPayload(Properties props) throws IOException {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- props.store(bout, "");
- return bout.toByteArray();
- }
+ public boolean accept(Serializable message, Member member) {
+ return true;
+ }
- private Properties getProperties(byte[] payload) throws IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(payload);
- Properties props = new Properties();
- props.load(bin);
- return props;
+ public void messageReceived(Serializable message, Member member) {
+ dispatchMessage(message);
+ }
}
static class ClusterMemberMemberTuple {
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
Fri Dec 31 17:13:10 2010
@@ -17,98 +17,21 @@
*/
package org.amdatu.core.fabric.osgi;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.UUID;
-
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.service.ClusterMessageServiceImpl;
-import org.amdatu.core.fabric.cluster.service.tribes.ClusterMemberServiceImpl;
-import org.amdatu.core.fabric.remote.DiscoveryService;
-import org.amdatu.core.fabric.remote.DistributionService;
-import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
-import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
-import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
-import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.amdatu.core.fabric.FabricManagerService;
+import org.amdatu.core.fabric.service.FabricManagerServiceImpl;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
import org.osgi.framework.BundleContext;
-import org.osgi.service.log.LogService;
public class Activator extends DependencyActivatorBase {
@Override
public void init(BundleContext context, DependencyManager manager) throws
Exception {
- // Generic props
-
- String memberid = UUID.randomUUID().toString();
- Dictionary<String, Object> cm1props = new Hashtable<String, Object>();
- cm1props.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, "CLUSTER1");
- cm1props.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, memberid);
- cm1props
- .put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new
String[] { "-port", "8880", "-throughput" });
-
- // ClusterMemberService
-
- manager.add(
- createComponent()
- .setImplementation(new ClusterMemberServiceImpl("CLUSTER1",
memberid, cm1props))
- .setInterface(ClusterMemberService.class.getName(), cm1props)
-
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
- // ClusterMessageService
-
- manager.add(
- createComponent()
- .setImplementation(new ClusterMessageServiceImpl())
- .setInterface(ClusterMessageService.class.getName(), cm1props)
- .add(
-
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
-
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
- // DiscoveryService
-
- manager.add(
- createComponent()
- .setImplementation(new DiscoveryServiceImpl())
- .setInterface(DiscoveryService.class.getName(), cm1props)
- .add(
-
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
- .add(
-
createServiceDependency().setService(ClusterMessageService.class).setRequired(true))
- .add(
-
createServiceDependency().setService(RemotableServiceEndpoint.class)
- .setCallbacks("remotableServiceEndPointAdded",
"remotableServiceEndPointRemoved")
- .setRequired(false))
-
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
- // DistributionService
-
- Dictionary<String, Object> distributionProps = new Hashtable<String,
Object>();
-
- manager.add(
- createComponent()
- .setImplementation(new DistributionServiceImpl())
- .setInterface(DistributionService.class.getName(),
distributionProps)
- .add(
-
createServiceDependency().setService(ClusterMemberService.class).setRequired(true))
- .add(
-
createServiceDependency().setService(ClusterMessageService.class).setRequired(true))
- .add(
- createServiceDependency()
- .setService(
- "("
- +
DistributionService.SERVICE_EXPORTED_CONFIGS_PROP + "="
- +
DistributionService.SERVICE_CONFIGURATION_TYPE + ")")
- .setCallbacks("localRemotableServiceAdded",
"localRemotableServiceRemoved")
- .setRequired(false))
- .add(
-
createServiceDependency().setService(RemoteServiceEndPoint.class)
- .setCallbacks("remoteServiceEndPointAdded",
"remoteServiceEndPointRemoved")
- .setRequired(false))
-
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ manager.add(createComponent()
+ .setInterface(FabricManagerService.class.getName(), null)
+ .setImplementation(new FabricManagerServiceImpl())
+
.add(createConfigurationDependency().setPid(FabricManagerService.CONFIGURATION_PID)));
}
@Override
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java
Fri Dec 31 17:13:10 2010
@@ -18,9 +18,6 @@
public interface DiscoveryService {
- String DISCOVERY_TOPIC = "org.amdatu.fabric.remote.DISCOVERY";
-
- ServiceEndPoint[] getLocalServiceEndPoints();
-
- ServiceEndPoint[] getRemoteServiceEndPoints();
+ String REMOTE_SERVICEGROUPID_PROP =
"org.amdatu.fabric.remote.SERVICEGROUP";
+ String EVENT_TOPIC_DISCOVERY = "org/amdatu/fabric/remote/DISCOVERY";
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
Fri Dec 31 17:13:10 2010
@@ -19,6 +19,9 @@
public interface DistributionService {
String REMOTE_TOPIC = "org.amdatu.fabric.remote.REMOTE";
+ String EVENT_TOPIC_REMOTE = "org/amdatu/fabric/remote/REMOTE";
+ String EVENT_TOPIC_INVOKE = "org/amdatu/fabric/remote/INVOKE";
+ String EVENT_TOPIC_RESPONSE = "org/amdatu/fabric/remote/RESPONSE";
// see OSGi R42 spec 13.5.1
// FIXME we are not actually doing anything to support these intents
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java
Fri Dec 31 17:13:10 2010
@@ -19,5 +19,4 @@
public interface RemoteServiceEndPoint {
ServiceEndPoint getServiceEndPoint();
-
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
Fri Dec 31 17:13:10 2010
@@ -20,10 +20,13 @@
import java.util.Enumeration;
import java.util.Hashtable;
-public class ServiceEndPoint implements Serializable {
+public final class ServiceEndPoint implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private String m_clusterId;
private String m_memberId;
+ private String m_serviceGroup;
private String[] m_objectClass;
private long m_originalServiceId;
private Hashtable<String, Object> m_properties;
@@ -47,6 +50,14 @@
m_memberId = memberId;
}
+ public String getServiceGroup() {
+ return m_serviceGroup;
+ }
+
+ public void setServiceGroup(String serviceGroup) {
+ m_serviceGroup = serviceGroup;
+ }
+
public String[] getObjectClass() {
return m_objectClass;
}
@@ -74,9 +85,21 @@
@Override
public boolean equals(Object obj) {
ServiceEndPoint other = (ServiceEndPoint) obj;
- if (!getClusterId().equals(other.getClusterId()))
+ if ((getServiceGroup() == null && other.getServiceGroup() != null)
+ || (getServiceGroup() != null &&
other.getServiceGroup() == null))
+ return false;
+ if ((getClusterId() == null && other.getClusterId() != null)
+ || (getClusterId() != null && other.getClusterId() ==
null))
+ return false;
+ if ((getMemberId() == null && other.getMemberId() != null)
+ || (getMemberId() != null && other.getMemberId() ==
null))
+ return false;
+ if (!(getServiceGroup() == null && other.getServiceGroup() == null) &&
!getServiceGroup().equals(
+ other.getServiceGroup()))
+ return false;
+ if (!(getClusterId() == null && other.getClusterId() == null) &&
!getClusterId().equals(other.getClusterId()))
return false;
- if (!getMemberId().equals(other.getMemberId()))
+ if (!(getMemberId() == null && other.getMemberId() == null) &&
!getMemberId().equals(other.getMemberId()))
return false;
if (getOriginalServiceId() != other.getOriginalServiceId())
return false;
@@ -85,7 +108,15 @@
@Override
public int hashCode() {
- return (getClusterId() + getMemberId() +
getOriginalServiceId()).hashCode();
+ String key = "";
+ if (getServiceGroup() != null)
+ key += getServiceGroup();
+ if (getClusterId() != null)
+ key += getClusterId();
+ if (getMemberId() != null)
+ key += getMemberId();
+ key += getOriginalServiceId();
+ return key.hashCode();
}
@Override
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
Fri Dec 31 17:13:10 2010
@@ -0,0 +1,10 @@
+package org.amdatu.core.fabric.remote.internal;
+
+import org.amdatu.core.fabric.remote.DiscoveryService;
+
+public final class DiscoveryUtilities {
+
+ public static String getLocalDiscoveryTopic(final String clusterGroup,
final String serviceGroup) {
+ return DiscoveryService.EVENT_TOPIC_DISCOVERY + "/" + clusterGroup +
"/" + serviceGroup;
+ }
+}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DistributionUtilities.java
Fri Dec 31 17:13:10 2010
@@ -26,6 +26,14 @@
public final class DistributionUtilities {
+ public static String getLocalInvokeTopic(final String clusterGroup, final
String serviceGroup) {
+ return DistributionService.EVENT_TOPIC_INVOKE + "/" + clusterGroup +
"/" + serviceGroup;
+ }
+
+ public static String getLocalResponseTopic(final String clusterGroup,
final String serviceGroup) {
+ return DistributionService.EVENT_TOPIC_RESPONSE + "/" + clusterGroup +
"/" + serviceGroup;
+ }
+
public static boolean isConfigurationTypeSupported(Dictionary<String,
Object> serviceRegistrationProperties) {
if (serviceRegistrationProperties == null) {
return false;
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
Fri Dec 31 17:13:10 2010
@@ -18,17 +18,29 @@
import java.io.Serializable;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointDepublishMessage implements Serializable {
+public class EndpointDepublishMessage extends RoutableMessage implements
LocalTopicMessage, Serializable {
- private ServiceEndPoint m_serviceEndPoint;
+ private static final long serialVersionUID = 1L;
+
+ private final ServiceEndPoint m_serviceEndPoint;
public EndpointDepublishMessage(ServiceEndPoint serviceEndPoint) {
+ super(serviceEndPoint.getServiceGroup());
m_serviceEndPoint = serviceEndPoint;
}
public ServiceEndPoint getServiceEndPoint() {
+ m_serviceEndPoint.setClusterId(getOriginClusterId());
+ m_serviceEndPoint.setMemberId(getOriginMemberId());
return m_serviceEndPoint;
}
+
+ public String getLocalTopic() {
+ return
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
+ m_serviceEndPoint.getServiceGroup());
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
Fri Dec 31 17:13:10 2010
@@ -18,8 +18,22 @@
import java.io.Serializable;
-public class EndpointDiscoveryMessage implements Serializable {
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
- public EndpointDiscoveryMessage() {
+public class EndpointDiscoveryMessage implements LocalTopicMessage,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String m_clusterId;
+ private final String m_serviceGroup;
+
+ public EndpointDiscoveryMessage(String clusterId, String serviceGroup) {
+ m_clusterId = clusterId;
+ m_serviceGroup = serviceGroup;
+ }
+
+ public String getLocalTopic() {
+ return DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterId,
+ m_serviceGroup);
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
Fri Dec 31 17:13:10 2010
@@ -16,19 +16,21 @@
*/
package org.amdatu.core.fabric.remote.internal;
-import java.io.Serializable;
import java.util.Map;
-import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointInvokeMessage extends RoutedMessage implements
Serializable {
+public class EndpointInvokeMessage extends RoutableMessage implements
LocalTopicMessage {
- private ServiceEndPoint m_serviceEndPoint;
- private Map<String, Object> m_payload;
+ private static final long serialVersionUID = 1L;
+
+ private final ServiceEndPoint m_serviceEndPoint;
+ private final Map<String, Object> m_payload;
public EndpointInvokeMessage(ServiceEndPoint serviceEndPoint, Map<String,
Object> payload) {
- super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId());
+ super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId(),
serviceEndPoint.getServiceGroup());
m_serviceEndPoint = serviceEndPoint;
m_payload = payload;
}
@@ -40,4 +42,9 @@
public Map<String, Object> getPayload() {
return m_payload;
}
+
+ public String getLocalTopic() {
+ return DistributionUtilities.getLocalInvokeTopic(getTargetClusterId(),
+ getTargetServiceGroup());
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
Fri Dec 31 17:13:10 2010
@@ -16,19 +16,29 @@
*/
package org.amdatu.core.fabric.remote.internal;
-import java.io.Serializable;
-
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointPublishMessage implements Serializable {
+public class EndpointPublishMessage extends RoutableMessage implements
LocalTopicMessage {
+
+ private static final long serialVersionUID = 1L;
private ServiceEndPoint m_serviceEndPoint;
public EndpointPublishMessage(ServiceEndPoint serviceEndPoint) {
+ super(serviceEndPoint.getServiceGroup());
m_serviceEndPoint = serviceEndPoint;
}
public ServiceEndPoint getServiceEndPoint() {
+ m_serviceEndPoint.setClusterId(getOriginClusterId());
+ m_serviceEndPoint.setMemberId(getOriginMemberId());
return m_serviceEndPoint;
}
+
+ public String getLocalTopic() {
+ return
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
+ m_serviceEndPoint.getServiceGroup());
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
Fri Dec 31 17:13:10 2010
@@ -16,23 +16,30 @@
*/
package org.amdatu.core.fabric.remote.internal;
-import java.io.Serializable;
import java.util.Map;
-import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
-import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
-public class EndpointResponseMessage extends RoutedMessage implements
Serializable {
+public class EndpointResponseMessage extends RoutableMessage implements
LocalTopicMessage {
+
+ private static final long serialVersionUID = 1L;
private Map<String, Object> m_payload;
- public EndpointResponseMessage(final String originClusterId, final String
originMemberId,
+ public EndpointResponseMessage(final String targetClusterId, final String
targetMemberId,
+ final String targetServiceGroup,
final Map<String, Object> payload) {
- super(originClusterId, originMemberId);
+ super(targetClusterId, targetMemberId, targetServiceGroup);
m_payload = payload;
}
public Map<String, Object> getPayload() {
return m_payload;
}
+
+ public String getLocalTopic() {
+ return
DistributionUtilities.getLocalResponseTopic(getTargetClusterId(),
+ getTargetServiceGroup());
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
Fri Dec 31 17:13:10 2010
@@ -19,27 +19,32 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
-import org.amdatu.core.fabric.remote.DistributionService;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.osgi.service.log.LogService;
/**
* I am a delegate to the DistributionService
* I proxy local service invocations and put them over the cluster
*/
-public final class LocalServiceInvocationHandler implements InvocationHandler,
ClusterTopicListener {
+public final class LocalServiceInvocationHandler implements InvocationHandler {
private final static long INVOCATION_TIMEOUT = 100;
@@ -49,27 +54,32 @@
private final ReentrantReadWriteLock m_invocationIdentifiersLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock m_invocationResponsesLock = new
ReentrantReadWriteLock();
- private volatile ClusterMemberService m_clusterMemberService;
- private volatile ClusterMessageService m_clusterMessageService;
+ private volatile DependencyManager m_dependencyManager;
+ private volatile Component m_component;
+ private volatile EventAdmin m_eventAdmin;
+ private volatile LogService m_logService;
+
+ private volatile Component m_serviceInvocationEventHandlerComponent;
private final ServiceEndPoint m_serviceEndpoint;
- private final Class<?>[] m_interfaceClasses;
- private final Set<Method> m_interfaceMethods = new HashSet<Method>();
+ private final Set<Method> m_serviceInterfaceMethods = new
HashSet<Method>();
- private String m_serviceEndpointTopic;
+ private final String m_clusterGroupId;
+ private final String m_serviceGroupId;
/********************************************************
* Constructors
********************************************************/
- public LocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint,
Class<?>[] interfaceClasses) {
+ public LocalServiceInvocationHandler(String clusterGroupId, String
serviceGroupId,
+ ServiceEndPoint serviceEndpoint,
+ Class<?>[] interfaceClasses) {
+ m_clusterGroupId = clusterGroupId;
+ m_serviceGroupId = serviceGroupId;
m_serviceEndpoint = serviceEndpoint;
- m_interfaceClasses = interfaceClasses;
- m_serviceEndpointTopic = DistributionService.REMOTE_TOPIC;
-
for (Class<?> interfaceClass : interfaceClasses) {
for (Method serviceMethod : interfaceClass.getMethods()) {
- m_interfaceMethods.add(serviceMethod);
+ m_serviceInterfaceMethods.add(serviceMethod);
}
}
}
@@ -86,25 +96,58 @@
* Service lifecycle
********************************************************/
+ public synchronized void init() {
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ m_component.add(logServiceDependency);
+
+ ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
+ eventAdminServiceDependency.setService(EventAdmin.class);
+ eventAdminServiceDependency.setRequired(true);
+ m_component.add(eventAdminServiceDependency);
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventConstants.EVENT_TOPIC,
+ new String[] {
DistributionUtilities.getLocalResponseTopic(m_clusterGroupId, m_serviceGroupId)
});
+ m_serviceInvocationEventHandlerComponent =
m_dependencyManager.createComponent();
+
m_serviceInvocationEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
+ m_serviceInvocationEventHandlerComponent.setImplementation(new
ServiceInvocationEventHandler());
+ }
+
+ public synchronized void destroy() {
+ }
+
public synchronized void start() {
- m_clusterMessageService.subscribe(this);
+ m_logService.log(LogService.LOG_WARNING, "Starting
LocalServiceInvocationHandler");
+ m_dependencyManager.add(m_serviceInvocationEventHandlerComponent);
}
public synchronized void stop() {
- m_clusterMessageService.unsubscribe(this);
+ m_logService.log(LogService.LOG_WARNING, "Starting
LocalServiceInvocationHandler");
+ m_dependencyManager.remove(m_serviceInvocationEventHandlerComponent);
}
-
/********************************************************
* InvocationHandler
********************************************************/
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
if (isServiceInterfaceInvocation(method)) {
+
String invocationIdentifier = createNewInvocationIdentifier();
- Map<String, Object> payload =
getInvocationPayload(invocationIdentifier, method, args);
- m_clusterMessageService.publish(m_serviceEndpointTopic, new
EndpointInvokeMessage(m_serviceEndpoint,
- payload));
+ Map<String, Object> messagePayload =
getInvocationPayload(invocationIdentifier, method, args);
+ EndpointInvokeMessage message = new
EndpointInvokeMessage(m_serviceEndpoint, messagePayload);
+ // FIXME this is awkward
+ message.setOriginServcieGroup(m_serviceGroupId);
+
+ Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
+ eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
message);
+ Event event =
+ new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ eventPayload);
+ m_eventAdmin.postEvent(event);
+
Object response = retrieveInvocationResponse(invocationIdentifier);
return response;
}
@@ -112,27 +155,6 @@
}
/********************************************************
- * ClusterTopicListner
- ********************************************************/
-
- public String getTopic() {
- return m_serviceEndpointTopic;
- }
-
- public void recieveMessage(Object message) {
- if (message instanceof EndpointResponseMessage) {
- EndpointResponseMessage endpointResponseMessage =
(EndpointResponseMessage) message;
- Map<String, Object> payload = endpointResponseMessage.getPayload();
- String invocationId = (String)
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
- if (ownsInvocationIndentifier(invocationId)) {
- Object response =
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
- storeResponseObject(invocationId, response);
- removeInvocationIdentifier(invocationId);
- }
- }
- }
-
- /********************************************************
* Object
********************************************************/
@@ -164,9 +186,13 @@
private String createNewInvocationIdentifier() {
String invocationId = UUID.randomUUID().toString();
- synchronized (m_invocationIdentifiers) {
+ m_invocationIdentifiersLock.writeLock().lock();
+ try {
m_invocationIdentifiers.add(invocationId);
}
+ finally {
+ m_invocationIdentifiersLock.writeLock().unlock();
+ }
return invocationId;
}
@@ -175,10 +201,6 @@
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY,
invocationId);
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_METHODNAME_KEY,
method.getName());
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ARGUMENTS_KEY,
args);
-
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY,
- m_clusterMemberService.getClusterId());
-
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY,
- m_clusterMemberService.getMemberId());
return payload;
}
@@ -197,7 +219,8 @@
}
}
if (isResponseTimedOut) {
- // FIXME now what?
+ m_logService.log(LogService.LOG_WARNING, "Waiting for invocation
response " + invocationId
+ + " timed out after " + INVOCATION_TIMEOUT + "ms");
}
return response;
}
@@ -258,6 +281,27 @@
private boolean isServiceInterfaceInvocation(final Method method) {
// no lock needed
- return m_interfaceMethods.contains(method);
+ return m_serviceInterfaceMethods.contains(method);
+ }
+
+ /********************************************************
+ * Helper classes
+ ********************************************************/
+
+ class ServiceInvocationEventHandler implements EventHandler {
+
+ public void handleEvent(Event event) {
+ Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ if (message instanceof EndpointResponseMessage) {
+ EndpointResponseMessage endpointResponseMessage =
(EndpointResponseMessage) message;
+ Map<String, Object> payload =
endpointResponseMessage.getPayload();
+ String invocationId = (String)
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
+ if (ownsInvocationIndentifier(invocationId)) {
+ Object response =
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
+ storeResponseObject(invocationId, response);
+ removeInvocationIdentifier(invocationId);
+ }
+ }
+ }
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Fri Dec 31 17:13:10 2010
@@ -25,18 +25,22 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.remote.internal.DiscoveryUtilities;
import org.amdatu.core.fabric.remote.internal.EndpointDepublishMessage;
import org.amdatu.core.fabric.remote.internal.EndpointDiscoveryMessage;
import org.amdatu.core.fabric.remote.internal.EndpointPublishMessage;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
import org.osgi.service.log.LogService;
/**
@@ -44,7 +48,7 @@
* I listen to the cluster and publish local RemoteServiceEndpoint services
for them
* TODO support LOOKUP requests for more fine grained discovery (use
ListenerHook)
*/
-public class DiscoveryServiceImpl implements DiscoveryService,
ClusterTopicListener {
+public final class DiscoveryServiceImpl implements DiscoveryService {
private final Set<ServiceEndPoint> m_remotableEndPoints = new
HashSet<ServiceEndPoint>();
private final ReentrantReadWriteLock m_remotableEndPointsLock = new
ReentrantReadWriteLock();
@@ -53,35 +57,92 @@
new HashMap<ServiceEndPoint, Component>();
private final ReentrantReadWriteLock m_remoteEndPointComponentsLock = new
ReentrantReadWriteLock();
- private volatile ClusterMemberService m_clusterMemberService;
- private volatile ClusterMessageService m_clusterMessageService;
private volatile DependencyManager m_dependencyManager;
private volatile Component m_component;
+ private volatile EventAdmin m_evenAdmin;
private volatile LogService m_logService;
+ private volatile Component m_discoveryEventHandlerComponent;
+
+ private final String m_clusterGroupId;
+ private final String m_serviceGroupId;
+
/********************************************************
* Constructors
********************************************************/
- public DiscoveryServiceImpl() {
-
+ public DiscoveryServiceImpl(String clusterGroupId, String serviceGroupId) {
+ m_clusterGroupId = clusterGroupId;
+ m_serviceGroupId = serviceGroupId;
}
/********************************************************
* Life cycle
********************************************************/
+ public synchronized void init() {
+ @SuppressWarnings("unchecked")
+ Dictionary<String, Object> discoveryProps =
m_component.getServiceProperties();
+ if (discoveryProps == null) {
+ discoveryProps = new Hashtable<String, Object>();
+ }
+ discoveryProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+ discoveryProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
+ m_component.setServiceProperties(discoveryProps);
+
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ m_component.add(logServiceDependency);
+
+ ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
+ eventAdminServiceDependency.setService(EventAdmin.class);
+ eventAdminServiceDependency.setRequired(true);
+ m_component.add(eventAdminServiceDependency);
+
+ ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
+ clusterMemberDependency
+ .setService(
+ ClusterMemberService.class,
+ "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" +
m_clusterGroupId + ")")
+ .setRequired(true);
+ m_component.add(clusterMemberDependency);
+
+ ServiceDependency remotableServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
+ remotableServiceEndpointsDependecy
+ .setService(RemotableServiceEndpoint.class,
+ "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ + m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ + m_serviceGroupId + "))")
+ .setCallbacks("remotableServiceEndPointAdded",
"remotableServiceEndPointRemoved")
+ .setRequired(false);
+ m_component.add(remotableServiceEndpointsDependecy);
+
+ Dictionary<String, Object> eventHandlerProps = new Hashtable<String,
Object>();
+ eventHandlerProps.put(EventConstants.EVENT_TOPIC,
+ new String[] {
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId)
});
+ m_discoveryEventHandlerComponent =
m_dependencyManager.createComponent();
+
m_discoveryEventHandlerComponent.setInterface(EventHandler.class.getName(),
eventHandlerProps);
+ m_discoveryEventHandlerComponent.setImplementation(new
DiscoveryEventHandler());
+ }
+
+ public synchronized void destroy() {
+ removeRemoteEndpointComponents();
+ }
+
public synchronized void start() {
- m_clusterMessageService.subscribe(this);
- m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointDiscoveryMessage());
+ m_logService.log(LogService.LOG_INFO, "Starting " + toString());
+ m_dependencyManager.add(m_discoveryEventHandlerComponent);
+ m_evenAdmin.postEvent(createdDiscoveryEvent());
}
public synchronized void stop() {
- m_clusterMessageService.unsubscribe(this);
+ m_logService.log(LogService.LOG_INFO, "Stopping " + toString());
+ m_dependencyManager.remove(m_discoveryEventHandlerComponent);
}
/********************************************************
- * Callbacks
+ * Dependency callbacks
********************************************************/
public void remotableServiceEndPointAdded(
@@ -92,7 +153,7 @@
try {
if (!m_remotableEndPoints.contains(serviceEndPoint)) {
m_remotableEndPoints.add(serviceEndPoint);
- m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointPublishMessage(serviceEndPoint));
+
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint));
}
else {
throw new IllegalStateException("Unexpected state... needs
analysis");
@@ -111,8 +172,8 @@
m_remotableEndPointsLock.writeLock().lock();
try {
if (m_remotableEndPoints.contains(serviceEndPoint)) {
- m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointDepublishMessage(serviceEndPoint));
m_remotableEndPoints.remove(serviceEndPoint);
+
m_evenAdmin.postEvent(createEndpointDepublishEvent(serviceEndPoint));
}
else {
throw new IllegalStateException("Unexpected state... needs
analysis");
@@ -124,97 +185,137 @@
}
/********************************************************
- * DiscoveryService
+ * Private methods
********************************************************/
- public ServiceEndPoint[] getLocalServiceEndPoints() {
- m_remotableEndPointsLock.readLock().lock();
+ private Event createdDiscoveryEvent() {
+ Dictionary<String, Object> eventProps = new Hashtable<String,
Object>();
+ eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
+ new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
+ Event discoveryEvent =
+ new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ eventProps);
+ return discoveryEvent;
+ }
+
+ private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint) {
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointPublishMessage(serviceEndPoint));
+ Event event =
+ new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ props);
+ return event;
+ }
+
+ private Event createEndpointDepublishEvent(ServiceEndPoint
serviceEndPoint) {
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointDepublishMessage(serviceEndPoint));
+ Event event =
+ new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ props);
+ return event;
+ }
+
+ private void removeRemoteEndpointComponents() {
+ m_remoteEndPointComponentsLock.writeLock().lock();
try {
- return m_remotableEndPoints.toArray(new
ServiceEndPoint[m_remotableEndPoints.size()]);
+ for (Component remoteEndPointComponent :
m_remoteEndPointComponents.values()) {
+ m_dependencyManager.remove(remoteEndPointComponent);
+ }
+ m_remoteEndPointComponents.clear();
}
finally {
- m_remotableEndPointsLock.readLock().unlock();
+ m_remoteEndPointComponentsLock.writeLock().unlock();
}
}
- public ServiceEndPoint[] getRemoteServiceEndPoints() {
- m_remoteEndPointComponentsLock.readLock().lock();
+ private void recieveEndpointDiscoveryMessage(final
EndpointDiscoveryMessage endpointDiscoveryMessage) {
+ m_remotableEndPointsLock.writeLock().lock();
try {
- return m_remoteEndPointComponents.keySet().toArray(
- new
ServiceEndPoint[m_remoteEndPointComponents.keySet().size()]);
+ for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
+ Event event = createEndpointPublishEvent(serviceEndPoint);
+ m_evenAdmin.postEvent(event);
+ }
}
finally {
- m_remoteEndPointComponentsLock.readLock().unlock();
+ m_remotableEndPointsLock.writeLock().unlock();
}
+ return;
}
- /********************************************************
- * ClusterTopicListner
- ********************************************************/
+ private void recieveEndpointDeplublishMessage(EndpointDepublishMessage
endpointDepublishMessage) {
+ ServiceEndPoint serviceEndPoint =
endpointDepublishMessage.getServiceEndPoint();
+ Component serviceComponent;
+ m_remoteEndPointComponentsLock.writeLock().lock();
+ try {
+ if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
+ return;
+ serviceComponent =
m_remoteEndPointComponents.remove(serviceEndPoint);
+ }
+ finally {
+ m_remoteEndPointComponentsLock.writeLock().unlock();
+ }
+ if (serviceComponent != null)
+ m_dependencyManager.remove(serviceComponent);
+ return;
+ }
- public void recieveMessage(Object message) {
- if (message instanceof EndpointPublishMessage) {
- EndpointPublishMessage endpointPublishMessage =
(EndpointPublishMessage) message;
- ServiceEndPoint serviceEndPoint =
endpointPublishMessage.getServiceEndPoint();
- m_remoteEndPointComponentsLock.writeLock().lock();
- try {
- if (m_remoteEndPointComponents.containsKey(serviceEndPoint))
- return;
- Dictionary<String, Object> distributionProps = new
Hashtable<String, Object>();
-
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP,
- m_clusterMemberService.getClusterId());
-
distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP,
m_clusterMemberService.getMemberId());
- Component serviceComponent =
- m_dependencyManager.createComponent()
- .setInterface(RemoteServiceEndPoint.class.getName(),
distributionProps)
- .setImplementation(new
RemoteServiceEndPointImpl(serviceEndPoint));
-
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
- DiscoveryService.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERID_PROP + "="
- +
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_CLUSTERID_PROP)
+ ")("
- + ClusterMemberService.CLUSTER_MEMBERID_PROP + "="
- +
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_MEMBERID_PROP)
+ "))")
- .setRequired(true));
- m_dependencyManager.add(serviceComponent);
- m_remoteEndPointComponents.put(serviceEndPoint,
serviceComponent);
- }
- finally {
- m_remoteEndPointComponentsLock.writeLock().unlock();
- }
- return;
- }
- if (message instanceof EndpointDepublishMessage) {
- EndpointDepublishMessage endpointDepublishMessage =
(EndpointDepublishMessage) message;
- ServiceEndPoint serviceEndPoint =
endpointDepublishMessage.getServiceEndPoint();
- m_remoteEndPointComponentsLock.writeLock().lock();
- try {
- if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
- return;
- Component serviceComponent =
m_remoteEndPointComponents.remove(serviceEndPoint);
- m_dependencyManager.remove(serviceComponent);
- }
- finally {
- m_remoteEndPointComponentsLock.writeLock().unlock();
- }
- return;
- }
- if (message instanceof EndpointDiscoveryMessage) {
- m_remotableEndPointsLock.writeLock().lock();
- try {
- for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
- m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointPublishMessage(serviceEndPoint));
- }
- }
- finally {
- m_remotableEndPointsLock.writeLock().unlock();
+ private void recieveEndpointPublishMessage(EndpointPublishMessage
endpointPublishMessage) {
+ ServiceEndPoint serviceEndPoint =
endpointPublishMessage.getServiceEndPoint();
+ Component serviceComponent = null;
+ m_remoteEndPointComponentsLock.writeLock().lock();
+ try {
+ if (m_remoteEndPointComponents.containsKey(serviceEndPoint)) {
+ return;
}
- return;
+ Dictionary<String, Object> distributionProps = new
Hashtable<String, Object>();
+
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
+ m_clusterGroupId);
+ distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
+ m_serviceGroupId);
+ serviceComponent =
+ m_dependencyManager.createComponent()
+ .setInterface(RemoteServiceEndPoint.class.getName(),
distributionProps)
+ .setImplementation(new
RemoteServiceEndPointImpl(serviceEndPoint));
+
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
+ DiscoveryService.class,
+ "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ + m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ + m_serviceGroupId + "))")
+ .setRequired(true));
+ m_remoteEndPointComponents.put(serviceEndPoint, serviceComponent);
+ }
+ finally {
+ m_remoteEndPointComponentsLock.writeLock().unlock();
}
- throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
- + DISCOVERY_TOPIC);
+ if (serviceComponent != null) {
+ m_dependencyManager.add(serviceComponent);
+ }
+ return;
}
- public String getTopic() {
- return DISCOVERY_TOPIC;
+ /********************************************************
+ * Helper classes
+ ********************************************************/
+
+ class DiscoveryEventHandler implements EventHandler {
+
+ public void handleEvent(Event event) {
+ Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ if (message instanceof EndpointPublishMessage) {
+ recieveEndpointPublishMessage((EndpointPublishMessage)
message);
+ return;
+ }
+ if (message instanceof EndpointDepublishMessage) {
+ recieveEndpointDeplublishMessage((EndpointDepublishMessage)
message);
+ return;
+ }
+ if (message instanceof EndpointDiscoveryMessage) {
+ recieveEndpointDiscoveryMessage((EndpointDiscoveryMessage)
message);
+ return;
+ }
+ throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
+ + DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId,
m_serviceGroupId));
+ }
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Fri Dec 31 17:13:10 2010
@@ -26,8 +26,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.ClusterMessageService;
-import org.amdatu.core.fabric.cluster.ClusterTopicListener;
+import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.DistributionService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
@@ -39,11 +38,17 @@
import org.amdatu.core.fabric.remote.internal.LocalServiceInvocationHandler;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
import org.osgi.service.log.LogService;
-public class DistributionServiceImpl implements DistributionService,
ClusterTopicListener {
+public class DistributionServiceImpl implements DistributionService {
public final static String MESSAGE_INVOCATION_ID_KEY = "A";
public final static String MESSAGE_INVOCATION_METHODNAME_KEY = "B";
@@ -52,80 +57,131 @@
public final static String MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY = "E";
public final static String MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY = "F";
- private final Map<ServiceEndPoint, ServiceReferenceComponentTuple>
m_serviceEndPointServiceReferenceComponents =
+ private final Map<ServiceEndPoint, ServiceReferenceComponentTuple>
m_localServiceEndPointServiceReferenceComponents =
new HashMap<ServiceEndPoint, ServiceReferenceComponentTuple>();
- private final ReentrantReadWriteLock
m_serviceEndPointServiceReferenceComponentsLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock
m_localServiceEndPointServiceReferenceComponentsLock =
+ new ReentrantReadWriteLock();
- private final Map<ServiceEndPoint, Component> m_serviceEndPointComponents =
+ private final Map<ServiceEndPoint, Component>
m_remoteServiceEndPointComponents =
new HashMap<ServiceEndPoint, Component>();
- private final ReentrantReadWriteLock m_serviceEndPointComponentsLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock m_remoteServiceEndPointComponentsLock
= new ReentrantReadWriteLock();
- private volatile ClusterMemberService m_clusterMemberService;
- private volatile ClusterMessageService m_clusterMessageService;
private volatile DependencyManager m_dependencyManager;
private volatile BundleContext m_bundleContext;
private volatile Component m_component;
+ private volatile EventAdmin m_eventAdmin;
private volatile LogService m_logService;
+ private volatile Component m_distributionEventHandlerComponent;
+
+ private final String m_clusterGroupId;
+ private final String m_serviceGroupId;
+
/********************************************************
* Constructors
********************************************************/
- public DistributionServiceImpl() {
+ public DistributionServiceImpl(String clusterGroupId, String
serviceGroupId) {
+ m_clusterGroupId = clusterGroupId;
+ m_serviceGroupId = serviceGroupId;
}
/********************************************************
- * Service life cycle methods
+ * Service lifecycle
********************************************************/
- public synchronized void start() {
- m_logService.log(LogService.LOG_WARNING, "Starting
DistributionService");
+ public synchronized void init() {
+ @SuppressWarnings("unchecked")
Dictionary<String, Object> distributionProps =
m_component.getServiceProperties();
- distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP,
m_clusterMemberService.getClusterId());
- distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP,
m_clusterMemberService.getMemberId());
+ if (distributionProps == null) {
+ distributionProps = new Hashtable<String, Object>();
+ }
+ distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+ distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
distributionProps.put(DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED_PROP,
DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED);
distributionProps.put(DistributionService.DISTRIBUTION_INTENTS_SUPPORTED_PROP,
DistributionService.DISTRIBUTION_INTENTS_SUPPORTED);
m_component.setServiceProperties(distributionProps);
- m_clusterMessageService.subscribe(this);
+
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ m_component.add(logServiceDependency);
+
+ ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
+ eventAdminServiceDependency.setService(EventAdmin.class);
+ eventAdminServiceDependency.setRequired(true);
+ m_component.add(eventAdminServiceDependency);
+
+ ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
+ clusterMemberDependency
+ .setService(
+ ClusterMemberService.class,
+ "(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "=" +
m_clusterGroupId + ")")
+ .setRequired(true);
+ m_component.add(clusterMemberDependency);
+
+ ServiceDependency remotableServicesDependecy =
m_dependencyManager.createServiceDependency();
+ remotableServicesDependecy
+ .setService(
+ "("
+ + DistributionService.SERVICE_EXPORTED_CONFIGS_PROP +
"="
+ + DistributionService.SERVICE_CONFIGURATION_TYPE + ")")
+ .setCallbacks("localRemotableServiceAdded",
"localRemotableServiceRemoved")
+ .setRequired(false);
+ m_component.add(remotableServicesDependecy);
+
+ ServiceDependency remoteServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
+ remoteServiceEndpointsDependecy
+ .setService(RemoteServiceEndPoint.class,
+ "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP + "="
+ + m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ + m_serviceGroupId + "))")
+ .setCallbacks("remoteServiceEndPointAdded",
"remoteServiceEndPointRemoved")
+ .setRequired(false);
+ m_component.add(remoteServiceEndpointsDependecy);
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventConstants.EVENT_TOPIC,
+ new String[] {
DistributionUtilities.getLocalInvokeTopic(m_clusterGroupId, m_serviceGroupId)
});
+ m_distributionEventHandlerComponent =
m_dependencyManager.createComponent();
+
m_distributionEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
+ m_distributionEventHandlerComponent.setImplementation(new
DistributionEventHandler());
+ }
+
+ public synchronized void destroy() {
+ removeLocalServiceEndPointComponents();
+ removeRemoteServiceEndPointComponents();
+ }
+
+ public synchronized void start() {
+ m_logService.log(LogService.LOG_INFO, "Starting " + toString());
+ m_dependencyManager.add(m_distributionEventHandlerComponent);
+
}
public synchronized void stop() {
- m_logService.log(LogService.LOG_WARNING, "Stopping
DistributionService");
- m_clusterMessageService.unsubscribe(this);
+ m_logService.log(LogService.LOG_INFO, "Stopping " + toString());
+ m_dependencyManager.remove(m_distributionEventHandlerComponent);
}
/********************************************************
* Dependency Callback methods
********************************************************/
- public void localRemotableServiceAdded(ServiceReference serviceReference
/* , Object Service */) {
+ public void localRemotableServiceAdded(final ServiceReference
serviceReference /* , Object Service */) {
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
m_logService
.log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint
configuration " + serviceEndPoint.toString());
return;
}
- Dictionary<String, Object> distributionProps = new Hashtable<String,
Object>();
- distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP,
m_clusterMemberService.getClusterId());
- distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP,
m_clusterMemberService.getMemberId());
- Component serviceComponent =
- m_dependencyManager.createComponent()
- .setInterface(RemotableServiceEndpoint.class.getName(),
distributionProps)
- .setImplementation(new
RemotableServiceEndPointImpl(serviceEndPoint));
-
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
- DistributionService.class,
- "(&(" + ClusterMemberService.CLUSTER_CLUSTERID_PROP + "="
- +
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_CLUSTERID_PROP)
+ ")("
- + ClusterMemberService.CLUSTER_MEMBERID_PROP + "="
- +
m_component.getServiceProperties().get(ClusterMemberService.CLUSTER_MEMBERID_PROP)
+ "))")
- .setRequired(true));
-
- m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+ Component serviceComponent =
createRemotableEndPointComponent(serviceEndPoint);
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
try {
- if
(!m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
-
m_serviceEndPointServiceReferenceComponents.put(serviceEndPoint,
+ if
(!m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint))
{
+
m_localServiceEndPointServiceReferenceComponents.put(serviceEndPoint,
new ServiceReferenceComponentTuple(
serviceReference, serviceComponent));
m_dependencyManager.add(serviceComponent);
@@ -133,17 +189,17 @@
}
}
finally {
-
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
}
}
public void localRemotableServiceRemoved(ServiceReference serviceReference
/* , Object Service */) {
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
- m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
try {
- if
(m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
+ if
(m_localServiceEndPointServiceReferenceComponents.containsKey(serviceEndPoint))
{
ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-
m_serviceEndPointServiceReferenceComponents.remove(serviceEndPoint);
+
m_localServiceEndPointServiceReferenceComponents.remove(serviceEndPoint);
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
m_logService
.log(LogService.LOG_WARNING, "Removed local
ServiceEndPoint: " + serviceEndPoint.toString());
@@ -155,7 +211,7 @@
}
}
finally {
-
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
}
}
@@ -165,10 +221,10 @@
Object localServiceProxy =
createLocalServiceInvocationHandler(serviceEndPoint);
if (localServiceProxy != null) {
Component localServiceComponent =
createLocalServiceComponent(serviceEndPoint, localServiceProxy);
- m_serviceEndPointComponentsLock.writeLock().lock();
+ m_remoteServiceEndPointComponentsLock.writeLock().lock();
try {
- if (!m_serviceEndPointComponents.containsKey(serviceEndPoint))
{
- m_serviceEndPointComponents.put(serviceEndPoint,
localServiceComponent);
+ if
(!m_remoteServiceEndPointComponents.containsKey(serviceEndPoint)) {
+ m_remoteServiceEndPointComponents.put(serviceEndPoint,
localServiceComponent);
m_dependencyManager.add(localServiceComponent);
m_logService.log(LogService.LOG_WARNING,
"Added remote ServiceEndPoint: " +
serviceEndPoint.toString());
@@ -180,7 +236,7 @@
}
}
finally {
- m_serviceEndPointComponentsLock.writeLock().unlock();
+ m_remoteServiceEndPointComponentsLock.writeLock().unlock();
}
}
}
@@ -188,10 +244,10 @@
public void remoteServiceEndPointRemoved(/* ServiceReference
serviceReference, */Object remoteServiceEndPointObject) {
RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint)
remoteServiceEndPointObject;
ServiceEndPoint serviceEndPoint =
remoteServiceEndPoint.getServiceEndPoint();
- m_serviceEndPointComponentsLock.writeLock().lock();
+ m_remoteServiceEndPointComponentsLock.writeLock().lock();
try {
- if (m_serviceEndPointComponents.containsKey(serviceEndPoint)) {
- Component localServiceComponent =
m_serviceEndPointComponents.get(serviceEndPoint);
+ if
(m_remoteServiceEndPointComponents.containsKey(serviceEndPoint)) {
+ Component localServiceComponent =
m_remoteServiceEndPointComponents.remove(serviceEndPoint);
m_dependencyManager.remove(localServiceComponent);
m_logService.log(LogService.LOG_WARNING,
"Removed remote ServiceEndPoint: " +
serviceEndPoint.toString());
@@ -203,112 +259,63 @@
}
}
finally {
- m_serviceEndPointComponentsLock.writeLock().unlock();
+ m_remoteServiceEndPointComponentsLock.writeLock().unlock();
}
}
/********************************************************
- * ClusterTopicListener
+ * Private methods
********************************************************/
- public String getTopic() {
- return DistributionService.REMOTE_TOPIC;
+ private void removeLocalServiceEndPointComponents() {
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().lock();
+ try {
+ for (ServiceReferenceComponentTuple serviceReferenceComponentTuple
: m_localServiceEndPointServiceReferenceComponents
+ .values()) {
+
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
+ }
+ m_localServiceEndPointServiceReferenceComponents.clear();
+ }
+ finally {
+
m_localServiceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+ }
}
- public void recieveMessage(Object message) {
- if (message instanceof EndpointInvokeMessage) {
- EndpointInvokeMessage endpointInvokeMessage =
(EndpointInvokeMessage) message;
- ServiceEndPoint serviceEndPoint =
endpointInvokeMessage.getServiceEndPoint();
- Map<String, Object> payload = endpointInvokeMessage.getPayload();
- String invocationId = (String)
payload.get(MESSAGE_INVOCATION_ID_KEY);
- String originClusterId = (String)
payload.get(MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY);
- String originMemberId = (String)
payload.get(MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY);
- String methodName = (String)
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
- Object[] args = (Object[])
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
- Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
-
- Object serviceResponse = null;
- m_serviceEndPointServiceReferenceComponentsLock.readLock().lock();
- try {
- ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-
m_serviceEndPointServiceReferenceComponents.get(serviceEndPoint);
- if (serviceReferenceComponentTuple == null) {
- if
(serviceEndPoint.getMemberId().equals(m_clusterMemberService.getMemberId())) {
- // TODO local service gone.. what to do? Send back
error to prevent client from waiting...
- m_logService
- .log(
- LogService.LOG_WARNING,
- "Dropping EndpointInvokeMessage for unknown
ServiceEndPoint: "
- + serviceEndPoint.toString());
- }
- else {
- m_logService.log(LogService.LOG_WARNING,
- "Dropping EndpointInvokeMessage for other
clustermember: " + serviceEndPoint.toString());
- }
- return;
- }
- ServiceReference serviceReference =
serviceReferenceComponentTuple.getServiceReference();
- try {
- Object serviceObject =
m_bundleContext.getService(serviceReference);
- if (serviceObject == null) {
- // TODO local service gone.. what to do? Send back
error to prevent client from waiting...
- m_logService
- .log(
- LogService.LOG_WARNING,
- "Dropping EndpointInvokeMessage for
unavailable service: "
- + serviceEndPoint.toString());
- return;
- }
- Method serviceMethod =
serviceObject.getClass().getMethod(methodName, types);
- serviceResponse = serviceMethod.invoke(serviceObject,
args);
-
- m_bundleContext.ungetService(serviceReference);
- }
- catch (SecurityException e) {
- // TODO its fooked.. what to do? Send back error to
prevent client from waiting...
- m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
- e.printStackTrace();
- }
- catch (NoSuchMethodException e) {
- // TODO its fooked.. what to do? Send back error to
prevent client from waiting...
- m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
- e.printStackTrace();
- }
- catch (IllegalArgumentException e) {
- // TODO its fooked.. what to do? Send back error to
prevent client from waiting...
- m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
- e.printStackTrace();
- }
- catch (IllegalAccessException e) {
- // TODO its fooked.. what to do? Send back error to
prevent client from waiting...
- m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
- e.printStackTrace();
- }
- catch (InvocationTargetException e) {
- // TODO its fooked.. what to do? Send back error to
prevent client from waiting...
- m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
- e.printStackTrace();
- }
- }
- finally {
-
m_serviceEndPointServiceReferenceComponentsLock.readLock().unlock();
- }
- if (serviceResponse != null) {
- Map<String, Object> responsePayload = new HashMap<String,
Object>();
- responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
- responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY,
serviceResponse);
- m_clusterMessageService
- .publish(REMOTE_TOPIC,
- new EndpointResponseMessage(originClusterId,
originMemberId, responsePayload));
+ private void removeRemoteServiceEndPointComponents() {
+ m_remoteServiceEndPointComponentsLock.writeLock().lock();
+ try {
+ for (Component serviceEndPointComponent :
m_remoteServiceEndPointComponents.values()) {
+ m_dependencyManager.remove(serviceEndPointComponent);
}
+ m_remoteServiceEndPointComponents.clear();
+ }
+ finally {
+ m_remoteServiceEndPointComponentsLock.writeLock().unlock();
}
}
- /********************************************************
- * private
- ********************************************************/
+ private Component createRemotableEndPointComponent(final ServiceEndPoint
serviceEndPoint) {
+ Dictionary<String, Object> distributionProps = new Hashtable<String,
Object>();
+ distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+ distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
+ Component serviceComponent =
+ m_dependencyManager.createComponent()
+ .setInterface(RemotableServiceEndpoint.class.getName(),
distributionProps)
+ .setImplementation(new
RemotableServiceEndPointImpl(serviceEndPoint));
+ ServiceDependency serviceDependency =
+ m_dependencyManager
+ .createServiceDependency()
+ .setService(
+ DistributionService.class,
+ "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP +
"="
+ + m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ + m_serviceGroupId + "))")
+ .setRequired(true);
+ serviceComponent.add(serviceDependency);
+ return serviceComponent;
+ }
- private Object createLocalServiceInvocationHandler(ServiceEndPoint
serviceEndpoint) {
+ private Object createLocalServiceInvocationHandler(final ServiceEndPoint
serviceEndpoint) {
Class<?>[] interfaceClasses = new
Class<?>[serviceEndpoint.getObjectClass().length];
for (int i = 0; i < serviceEndpoint.getObjectClass().length; i++) {
String interfaceName = serviceEndpoint.getObjectClass()[i];
@@ -322,48 +329,53 @@
}
}
LocalServiceInvocationHandler localServiceInvocationHandler =
- new LocalServiceInvocationHandler(serviceEndpoint,
interfaceClasses);
+ new LocalServiceInvocationHandler(m_clusterGroupId,
m_serviceGroupId, serviceEndpoint,
+ interfaceClasses);
Object serviceObject =
Proxy.newProxyInstance(interfaceClasses[0].getClassLoader(),
interfaceClasses, localServiceInvocationHandler);
return serviceObject;
}
- private Component createLocalServiceComponent(ServiceEndPoint
serviceEndPoint, Object serviceObject) {
- Hashtable<String, Object> registrationProperties =
serviceEndPoint.getProperties();
-
- String[] importedIntents =
DistributionUtilities.mergeExportedIntents(registrationProperties);
-
-
registrationProperties.remove(DistributionService.SERVICE_INTENTS_PROP);
-
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP);
-
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
-
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
-
registrationProperties.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
-
- registrationProperties.put(DistributionService.SERVICE_IMPORTED_PROP,
"true");
- registrationProperties.put(DistributionService.SERVICE_INTENTS_PROP,
importedIntents);
-
registrationProperties.put(DistributionService.SERVICE_IMPORTED_CONFIGS_PROP,
+ private Component createLocalServiceComponent(final ServiceEndPoint
serviceEndpoint, final Object serviceObject) {
+ Hashtable<String, Object> props = serviceEndpoint.getProperties();
+ String[] importedIntents =
DistributionUtilities.mergeExportedIntents(props);
+ props.remove(DistributionService.SERVICE_INTENTS_PROP);
+ props.remove(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP);
+ props.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
+ props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
+ props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
+ props.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
m_clusterGroupId);
+ props.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
+ props.put(DistributionService.SERVICE_IMPORTED_PROP, "true");
+ props.put(DistributionService.SERVICE_INTENTS_PROP, importedIntents);
+ props.put(DistributionService.SERVICE_IMPORTED_CONFIGS_PROP,
DistributionService.SERVICE_CONFIGURATION_TYPE);
-
- Component component = m_dependencyManager.createComponent()
- .setInterface(serviceEndPoint.getObjectClass(),
registrationProperties)
+ Component serviceComponent = m_dependencyManager.createComponent()
+ .setInterface(serviceEndpoint.getObjectClass(), props)
.setImplementation(serviceObject);
-
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMemberService.class)
- .setRequired(true));
-
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMessageService.class)
- .setRequired(true));
- return component;
+ ServiceDependency serviceDependency =
+ m_dependencyManager
+ .createServiceDependency()
+ .setService(
+ DistributionService.class,
+ "(&(" + ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP +
"="
+ + m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ + m_serviceGroupId + "))")
+ .setRequired(true);
+ serviceComponent.add(serviceDependency);
+ return serviceComponent;
}
- private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint
serviceEndPoint) {
- if
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndPoint.getProperties()))
{
+ private boolean isServiceEndpointConfigurationSupported(ServiceEndPoint
serviceEndpoint) {
+ if
(!DistributionUtilities.isConfigurationTypeSupported(serviceEndpoint.getProperties()))
{
System.err.println("No supported configuration type");
return false;
}
- if
(!DistributionUtilities.isExportedIntentsListSupported(serviceEndPoint.getProperties()))
{
+ if
(!DistributionUtilities.isExportedIntentsListSupported(serviceEndpoint.getProperties()))
{
System.err.println("Not all intents supported");
return false;
}
- if
(!DistributionUtilities.isExportedInterfacesSupported(serviceEndPoint.getProperties(),
+ if
(!DistributionUtilities.isExportedInterfacesSupported(serviceEndpoint.getProperties(),
new ClassLoaderAdaptor() {
public Class<?> loadClass(String className) throws
ClassNotFoundException {
return m_bundleContext.getBundle().loadClass(className);
@@ -375,13 +387,13 @@
return true;
}
- private ServiceEndPoint
serviceEndPointFromServiceReference(ServiceReference serviceReference) {
+ private ServiceEndPoint serviceEndPointFromServiceReference(final
ServiceReference serviceReference) {
ServiceEndPoint serviceEndPoint = new ServiceEndPoint();
- serviceEndPoint.setClusterId(m_clusterMemberService.getClusterId());
- serviceEndPoint.setMemberId(m_clusterMemberService.getMemberId());
+ serviceEndPoint.setClusterId(m_clusterGroupId);
+ serviceEndPoint.setServiceGroup(m_serviceGroupId);
serviceEndPoint.setObjectClass((String[]) serviceReference
.getProperty(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP));
- serviceEndPoint.setOriginalServiceId((Long)
serviceReference.getProperty("service.id"));
+ serviceEndPoint.setOriginalServiceId((Long)
serviceReference.getProperty(Constants.SERVICE_ID));
Hashtable<String, Object> properties = new Hashtable<String, Object>();
for (String key : serviceReference.getPropertyKeys()) {
properties.put(key, serviceReference.getProperty(key));
@@ -390,6 +402,119 @@
return serviceEndPoint;
}
+ private void recieveEndpointInvokeMessage(EndpointInvokeMessage
endpointInvokeMessage) {
+
+ ServiceEndPoint serviceEndPoint =
endpointInvokeMessage.getServiceEndPoint();
+ //FIXME address this
+ serviceEndPoint.setMemberId(null);
+ Object serviceResponse = null;
+
+ m_localServiceEndPointServiceReferenceComponentsLock.readLock().lock();
+ try {
+ ServiceReferenceComponentTuple serviceReferenceComponentTuple =
+
m_localServiceEndPointServiceReferenceComponents.get(serviceEndPoint);
+ if (serviceReferenceComponentTuple == null) {
+ // FIXME check is not ok
+ if (serviceEndPoint.getMemberId().equals(m_clusterGroupId)) {
+ m_logService
+ .log(LogService.LOG_WARNING,
+ "Dropping EndpointInvokeMessage for unknown
ServiceEndPoint: " + serviceEndPoint.toString());
+ }
+ else {
+ m_logService.log(LogService.LOG_WARNING,
+ "Dropping EndpointInvokeMessage for other
clustermember: " + serviceEndPoint.toString());
+ }
+ return;
+ }
+ serviceResponse =
+ invokeService(endpointInvokeMessage,
serviceReferenceComponentTuple.getServiceReference());
+ }
+ finally {
+
m_localServiceEndPointServiceReferenceComponentsLock.readLock().unlock();
+ }
+ if (serviceResponse != null) {
+
m_eventAdmin.postEvent(createEndpointResponseEvent(endpointInvokeMessage,
serviceResponse));
+ }
+ }
+
+ private Object invokeService(final EndpointInvokeMessage
endpointInvokeMessage,
+ final ServiceReference serviceReference) {
+
+ ServiceEndPoint serviceEndPoint =
endpointInvokeMessage.getServiceEndPoint();
+ Map<String, Object> payload = endpointInvokeMessage.getPayload();
+ String methodName = (String)
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
+ Object[] args = (Object[])
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
+ Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
+ Object serviceObject = m_bundleContext.getService(serviceReference);
+ if (serviceObject == null) {
+ m_logService.log(LogService.LOG_WARNING, "Dropping
EndpointInvokeMessage for unavailable service: "
+ + serviceEndPoint.toString());
+ return null;
+ }
+ try {
+ Method serviceMethod =
serviceObject.getClass().getMethod(methodName, types);
+ Object serviceResponse = serviceMethod.invoke(serviceObject, args);
+ return serviceResponse;
+ }
+ catch (SecurityException e) {
+ m_logService.log(LogService.LOG_ERROR, "Exception during local
service invocation", e);
+ }
+ catch (NoSuchMethodException e) {
+ m_logService.log(LogService.LOG_ERROR, "Exception during local
service invocation", e);
+ }
+ catch (IllegalArgumentException e) {
+ m_logService.log(LogService.LOG_ERROR, "Exception during local
service invocation", e);
+ }
+ catch (IllegalAccessException e) {
+ m_logService.log(LogService.LOG_ERROR, "Exception during local
service invocation", e);
+ }
+ catch (InvocationTargetException e) {
+ m_logService.log(LogService.LOG_ERROR, "Exception during local
service invocation", e);
+ }
+ finally {
+ m_bundleContext.ungetService(serviceReference);
+ }
+ return null;
+ }
+
+ private Event createEndpointResponseEvent(EndpointInvokeMessage
endpointInvokeMessage, Object serviceResponse) {
+ Map<String, Object> payload = endpointInvokeMessage.getPayload();
+ String invocationId = (String) payload.get(MESSAGE_INVOCATION_ID_KEY);
+ String originClusterId = (String)
endpointInvokeMessage.getOriginClusterId();
+ String originMemberId = (String)
endpointInvokeMessage.getOriginMemberId();
+ String originServiceGroup = (String)
endpointInvokeMessage.getOriginServiceGroup();
+
+ Map<String, Object> responsePayload = new HashMap<String, Object>();
+ responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
+ responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY,
serviceResponse);
+
+ Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
+ eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointResponseMessage(originClusterId,
+ originMemberId, originServiceGroup, responsePayload));
+
+ Event responseEvent =
+ new Event(ClusterMemberService.EVENT_TOPIC_BROADCAST + "/" +
m_clusterGroupId,
+ eventPayload);
+ return responseEvent;
+ }
+
+ /********************************************************
+ * Helper classes
+ ********************************************************/
+
+ class DistributionEventHandler implements EventHandler {
+
+ public void handleEvent(Event event) {
+ Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ if (message instanceof EndpointInvokeMessage) {
+ recieveEndpointInvokeMessage((EndpointInvokeMessage) message);
+ return;
+ }
+ throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
+ + REMOTE_TOPIC);
+ }
+ }
+
static class ServiceReferenceComponentTuple {
private final ServiceReference m_serviceReference;
private final Component m_component;
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Fri Dec 31 17:13:10 2010
@@ -0,0 +1,267 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.service;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.amdatu.core.fabric.FabricManagerService;
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import
org.amdatu.core.fabric.cluster.service.tribes.TribesClusterMemberServiceImpl;
+import org.amdatu.core.fabric.remote.DiscoveryService;
+import org.amdatu.core.fabric.remote.DistributionService;
+import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
+import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.felix.dm.ServiceDependency;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+public class FabricManagerServiceImpl implements FabricManagerService {
+
+ private final Map<String, Component> m_clusterMemberComponents = new
HashMap<String, Component>();
+ private final ReentrantReadWriteLock m_clusterMemberComponentsLock = new
ReentrantReadWriteLock();
+
+ private final Map<String, Component> m_discoveryComponents = new
HashMap<String, Component>();
+ private final ReentrantReadWriteLock m_discoveryComponentsLock = new
ReentrantReadWriteLock();
+
+ private final Map<String, Component> m_distributionComponents = new
HashMap<String, Component>();
+ private final ReentrantReadWriteLock m_distributionComponentsLock = new
ReentrantReadWriteLock();
+
+ private volatile DependencyManager m_dependencyManager;
+ private volatile Component m_component;
+ private volatile LogService m_logService;
+
+ /********************************************************
+ * Service lifecycle
+ ********************************************************/
+
+ public final synchronized void init() {
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ m_component.add(logServiceDependency);
+ }
+
+ public final synchronized void destroy() {
+ }
+
+ public synchronized void start() {
+ m_logService.log(LogService.LOG_WARNING, "Amdatu Service Fabric
Manager started");
+ }
+
+ public synchronized void stop() {
+ m_logService.log(LogService.LOG_WARNING, "Amdatu Service Fabric
Manager stopped");
+ }
+
+ /********************************************************
+ * ManagedService (controlled by dependencymanager)
+ ********************************************************/
+
+ public synchronized void updated(Dictionary<String, Object> dictionary)
throws ConfigurationException {
+ if (dictionary != null) {
+ String clusternamesValue = (String)
dictionary.get("org.amdatu.fabric.groupchannels");
+ if (clusternamesValue != null) {
+ String[] clusterNames = clusternamesValue.split(",");
+ for (String clusterName : clusterNames) {
+ String clusterGroupId =
+ (String) dictionary.get("org.amdatu.fabric." +
clusterName + ".groupchannel");
+ String clusterMemberId = (String)
dictionary.get("org.amdatu.fabric." + clusterName + ".memberid");
+ String args = (String) dictionary.get("org.amdatu.fabric."
+ clusterName + ".tribes.args");
+ Dictionary<String, Object> properties = new
Hashtable<String, Object>();
+ properties
+
.put(TribesClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, args.split(" "));
+
+ createClusterMember(clusterGroupId, clusterMemberId,
properties);
+ }
+ }
+
+ String remotenamesValue = (String)
dictionary.get("org.amdatu.fabric.servicegroups");
+ if (remotenamesValue != null) {
+ String[] remoteNames = remotenamesValue.split(",");
+ for (String remoteName : remoteNames) {
+ String clusterGroupId =
+ (String) dictionary.get("org.amdatu.fabric." +
remoteName + ".groupchannel");
+ String serviceGroupId =
+ (String) dictionary.get("org.amdatu.fabric." +
remoteName + ".servicegroup");
+
+ createDistribution(clusterGroupId, serviceGroupId);
+ createDiscovery(clusterGroupId, serviceGroupId);
+ }
+ }
+ }
+ if (m_logService != null)
+ m_logService.log(LogService.LOG_WARNING, "Amdatu Fabric updated");
+ }
+
+ /********************************************************
+ * FabricManagerService interface
+ ********************************************************/
+
+ public boolean createClusterMember(String clusterGroupId, String
clusterMemberId,
+ Dictionary<String, Object> properties) {
+ if (clusterMemberId == null || "".equals(clusterMemberId)) {
+ return false;
+ }
+
+ Dictionary<String, Object> svcProperties = new Hashtable<String,
Object>();
+ svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERGROUP_PROP,
clusterMemberId);
+ svcProperties.put(ClusterMemberService.CLUSTER_CLUSTERMEMBER_PROP,
clusterMemberId);
+
+ Component clusterMemberComponent =
+ m_dependencyManager.createComponent()
+ .setInterface(ClusterMemberService.class.getName(), properties)
+ .setImplementation(new
TribesClusterMemberServiceImpl(clusterGroupId, clusterMemberId, properties))
+ .add(
+ m_dependencyManager.createServiceDependency()
+ .setService(FabricManagerService.class)
+ .setRequired(true));
+
+ m_clusterMemberComponentsLock.writeLock().lock();
+ try {
+ if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId,
clusterMemberId, ""))) {
+ m_dependencyManager.remove(m_clusterMemberComponents
+ .remove(getKey(clusterGroupId, clusterMemberId, "")));
+ }
+ m_clusterMemberComponents.put(getKey(clusterGroupId,
clusterMemberId, ""), clusterMemberComponent);
+ m_dependencyManager.add(clusterMemberComponent);
+ return true;
+ }
+ finally {
+ m_clusterMemberComponentsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean removeClusterMember(String clusterGroupId, String
clusterMemberId) {
+ if (clusterMemberId == null || "".equals(clusterMemberId)) {
+ return false;
+ }
+
+ m_clusterMemberComponentsLock.writeLock().lock();
+ try {
+ if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId,
clusterMemberId, ""))) {
+ m_dependencyManager.remove(m_clusterMemberComponents
+ .remove(getKey(clusterGroupId, clusterMemberId, "")));
+ return true;
+ }
+ return false;
+ }
+ finally {
+ m_clusterMemberComponentsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean createDiscovery(String clusterGroupId, String
serviceGroupId) {
+ // FIXME improve checking
+
+ Component discoveryComponent =
+ m_dependencyManager
+ .createComponent()
+ .setInterface(DiscoveryService.class.getName(),
+ null)
+ .setImplementation(new DiscoveryServiceImpl(clusterGroupId,
serviceGroupId))
+ .add(
+ m_dependencyManager.createServiceDependency()
+ .setService(FabricManagerService.class)
+ .setRequired(true));
+
+ m_discoveryComponentsLock.writeLock().lock();
+ try {
+ if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "",
serviceGroupId))) {
+
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId,
"",
+ serviceGroupId)));
+ }
+ m_discoveryComponents.put(getKey(clusterGroupId, "",
serviceGroupId), discoveryComponent);
+ m_dependencyManager.add(discoveryComponent);
+ return true;
+ }
+ finally {
+ m_discoveryComponentsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean removeDiscovery(String clusterGroupId, String
serviceGroupId) {
+ // FIXME improve checks
+
+ m_discoveryComponentsLock.writeLock().lock();
+ try {
+ if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "",
serviceGroupId))) {
+
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId,
"",
+ serviceGroupId)));
+ return true;
+ }
+ return false;
+ }
+ finally {
+ m_discoveryComponentsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean createDistribution(String clusterGroupId, String
serviceGroupId) {
+ // FIXME improve checks
+
+ Component distributionComponent =
+ m_dependencyManager
+ .createComponent()
+ .setInterface(DistributionService.class.getName(), null)
+ .setImplementation(new DistributionServiceImpl(clusterGroupId,
serviceGroupId))
+ .add(
+ m_dependencyManager.createServiceDependency()
+ .setService(FabricManagerService.class)
+ .setRequired(true));
+
+ m_distributionComponentsLock.writeLock().lock();
+ try {
+ if (m_distributionComponents.containsKey(getKey(clusterGroupId,
"", serviceGroupId))) {
+
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
"",
+ serviceGroupId)));
+ }
+ m_distributionComponents.put(getKey(clusterGroupId, "",
serviceGroupId),
+ distributionComponent);
+ m_dependencyManager.add(distributionComponent);
+ return true;
+ }
+ finally {
+ m_distributionComponentsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean removeDistribution(String clusterGroupId, String
serviceGroupId) {
+ // FIXME improve checks
+
+ m_distributionComponentsLock.writeLock().lock();
+ try {
+ if (m_distributionComponents.containsKey(getKey(clusterGroupId,
"", serviceGroupId))) {
+
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
"",
+ serviceGroupId)));
+ return true;
+ }
+ return false;
+ }
+ finally {
+ m_distributionComponentsLock.writeLock().unlock();
+ }
+ }
+
+ private String getKey(String clusterGroupId, String clusterMemberId,
String serviceGroupId) {
+ return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
+ }
+}