Author: bdekruijff at gmail.com
Date: Tue Dec 21 18:33:02 2010
New Revision: 524
Log:
[sandbox] improved concurrency/performance/logging, testcode moved out
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
Removed:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/FabricTestService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/NonRemotableService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/RemotableService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/
Modified:
sandbox/bdekruijff/fabric/pom.xml
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
Modified: sandbox/bdekruijff/fabric/pom.xml
==============================================================================
--- sandbox/bdekruijff/fabric/pom.xml (original)
+++ sandbox/bdekruijff/fabric/pom.xml Tue Dec 21 18:33:02 2010
@@ -42,6 +42,7 @@
<instructions>
<Bundle-Activator>org.amdatu.core.fabric.osgi.Activator</Bundle-Activator>
<Bundle-SymbolicName>org.amdatu.core.fabric</Bundle-SymbolicName>
+ <DynamicImport-Package>*</DynamicImport-Package>
<Export-Package>org.amdatu.core.fabric.cluster,
org.amdatu.core.fabric.remote</Export-Package>
<Embed-Dependency>*;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
Tue Dec 21 18:33:02 2010
@@ -31,8 +31,12 @@
ClusterMember[] getClusterMembers();
+ ClusterMember getClusterMember(String memberId);
+
void broadcast(Object message);
+ void send(ClusterMember[] clusterMembers, Object message);
+
void subscribe(ClusterMessageListener clusterMessageListener);
void unsubscribe(ClusterMessageListener clusterMessageListener);
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/RoutedMessage.java
Tue Dec 21 18:33:02 2010
@@ -0,0 +1,27 @@
+package org.amdatu.core.fabric.cluster.internal;
+
+import java.io.Serializable;
+
+public abstract class RoutedMessage implements Serializable {
+
+ private final String m_targetClusterId;
+ private final String[] m_targetMemberIds;
+
+ public RoutedMessage(final String clusterId, final String memberId) {
+ m_targetClusterId = clusterId;
+ m_targetMemberIds = new String[] { memberId };
+ }
+
+ public RoutedMessage(final String clusterId, String[] memberIds) {
+ m_targetClusterId = clusterId;
+ m_targetMemberIds = memberIds;
+ }
+
+ public final String getClusterId() {
+ return m_targetClusterId;
+ }
+
+ public final String[] getMemberIds() {
+ return m_targetMemberIds;
+ }
+}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/AbstractClusterMemberService.java
Tue Dec 21 18:33:02 2010
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
@@ -35,9 +36,12 @@
*/
public abstract class AbstractClusterMemberService implements
ClusterMemberService {
- private final Set<ClusterMember> m_clusterMembers = new
HashSet<ClusterMember>();
+ private final Map<String, ClusterMember> m_clusterMembers = new
HashMap<String, ClusterMember>();
private final Set<ClusterMessageListener> m_clusterMessageListeners = new
HashSet<ClusterMessageListener>();
+ private final ReentrantReadWriteLock m_clusterMembersLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock m_clusterMessageListenersLock = new
ReentrantReadWriteLock();
+
private final ExecutorService m_executorService =
Executors.newFixedThreadPool(1);
private final String m_clusterId;
@@ -79,8 +83,22 @@
}
public final ClusterMember[] getClusterMembers() {
- synchronized (m_clusterMembers) {
- return m_clusterMembers.toArray(new
ClusterMember[m_clusterMembers.size()]);
+ m_clusterMembersLock.readLock().lock();
+ try {
+ return m_clusterMembers.values().toArray(new
ClusterMember[m_clusterMembers.size()]);
+ }
+ finally {
+ m_clusterMembersLock.readLock().unlock();
+ }
+ }
+
+ public final ClusterMember getClusterMember(String memberId) {
+ m_clusterMembersLock.readLock().lock();
+ try {
+ return m_clusterMembers.get(memberId);
+ }
+ finally {
+ m_clusterMembersLock.readLock().unlock();
}
}
@@ -88,17 +106,29 @@
m_executorService.submit(new BroadcastRunnable(message));
}
+ public final void send(ClusterMember[] clusterMembers, Object message) {
+ m_executorService.submit(new SendRunnable(clusterMembers, message));
+ }
+
public final void subscribe(ClusterMessageListener clusterMessageListener)
{
- synchronized (m_clusterMessageListeners) {
+ m_clusterMessageListenersLock.writeLock().lock();
+ try {
m_clusterMessageListeners.add(clusterMessageListener);
}
+ finally {
+ m_clusterMessageListenersLock.writeLock().unlock();
+ }
onSubscribe(clusterMessageListener);
}
public final void unsubscribe(ClusterMessageListener
clusterMessageListener) {
- synchronized (m_clusterMessageListeners) {
+ m_clusterMessageListenersLock.writeLock().lock();
+ try {
m_clusterMessageListeners.remove(clusterMessageListener);
}
+ finally {
+ m_clusterMessageListenersLock.writeLock().unlock();
+ }
onUnsubscribe(clusterMessageListener);
}
@@ -107,23 +137,35 @@
********************************************************/
protected final void addClusterMember(ClusterMember clusterMember) {
- synchronized (m_clusterMembers) {
- m_clusterMembers.add(clusterMember);
+ m_clusterMembersLock.writeLock().lock();
+ try {
+ m_clusterMembers.put(clusterMember.getId(), clusterMember);
+ }
+ finally {
+ m_clusterMembersLock.writeLock().unlock();
}
}
protected final void removeClusterMember(ClusterMember clusterMember) {
- synchronized (m_clusterMembers) {
- m_clusterMembers.remove(clusterMember);
+ m_clusterMembersLock.writeLock().lock();
+ try {
+ m_clusterMembers.remove(clusterMember.getId());
+ }
+ finally {
+ m_clusterMembersLock.writeLock().unlock();
}
}
protected final void dispatchMessage(Object message) {
- synchronized (m_clusterMessageListeners) {
+ m_clusterMessageListenersLock.readLock().lock();
+ try {
for (ClusterMessageListener clm : m_clusterMessageListeners) {
m_executorService.submit(new MessageDispatchRunnable(clm,
message));
}
}
+ finally {
+ m_clusterMessageListenersLock.readLock().unlock();
+ }
}
protected void onSubscribe(ClusterMessageListener clusterMessageListener) {
@@ -134,6 +176,8 @@
public abstract void doBroadcast(Object message);
+ public abstract void doSend(ClusterMember[] clusterMember, Object message);
+
/********************************************************
* helper classes
********************************************************/
@@ -151,6 +195,21 @@
}
}
+ class SendRunnable implements Runnable {
+
+ private final ClusterMember[] m_clusterMember;
+ private final Object m_message;
+
+ public SendRunnable(final ClusterMember[] clusterMembers, final Object
message) {
+ m_clusterMember = clusterMembers;
+ m_message = message;
+ }
+
+ public void run() {
+ doSend(m_clusterMember, m_message);
+ }
+ }
+
static class MessageDispatchRunnable implements Runnable {
private final ClusterMessageListener m_clusterMessageListener;
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMessageServiceImpl.java
Tue Dec 21 18:33:02 2010
@@ -18,22 +18,35 @@
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.ClusterMessageListener;
import org.amdatu.core.fabric.cluster.ClusterMessageService;
import org.amdatu.core.fabric.cluster.ClusterTopicListener;
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
import org.amdatu.core.fabric.cluster.internal.TopicMessageWrapper;
+import org.osgi.service.log.LogService;
public class ClusterMessageServiceImpl implements ClusterMessageService,
ClusterMessageListener {
private final Map<String, Set<ClusterTopicListener>>
m_clusterTopicListeners =
new HashMap<String, Set<ClusterTopicListener>>();
+ private final ReentrantReadWriteLock m_clusterTopicListenersLock = new
ReentrantReadWriteLock();
+
+ private final ExecutorService m_executorService =
Executors.newFixedThreadPool(1);
+
// injected
private volatile ClusterMemberService m_clusterMemberService;
+ private volatile LogService m_logService;
/********************************************************
* Constructors
@@ -59,31 +72,75 @@
* ClusterMessageService
********************************************************/
- public void publish(String topic, Object message) {
- m_clusterMemberService.broadcast(new TopicMessageWrapper(topic,
message));
+ public void publish(final String topic, final Object message) {
+ if (message instanceof RoutedMessage) {
+ RoutedMessage routedMessage = (RoutedMessage) message;
+ if (routedMessage.getClusterId() == null
+ ||
!routedMessage.getClusterId().equals(m_clusterMemberService.getClusterId())) {
+ m_logService.log(LogService.LOG_ERROR,
+ "RoutedMessage is not for this cluster: " +
routedMessage.getClusterId());
+ return;
+ }
+ if (routedMessage.getMemberIds() == null ||
routedMessage.getMemberIds().length == 0) {
+ m_logService.log(LogService.LOG_ERROR, "RoutedMessage does not
specify any target members");
+ return;
+ }
+ List<ClusterMember> clusterMembers = new
LinkedList<ClusterMember>();
+ for (String memberId : routedMessage.getMemberIds()) {
+ ClusterMember clusterMember =
m_clusterMemberService.getClusterMember(memberId);
+ if (clusterMember != null) {
+ clusterMembers.add(clusterMember);
+ }
+ else {
+ m_logService.log(LogService.LOG_WARNING, "RoutedMessage
specifies unknown target member: "
+ + memberId);
+ }
+ }
+ if (clusterMembers.size() > 0) {
+ m_clusterMemberService.send(clusterMembers.toArray(new
ClusterMember[clusterMembers.size()]),
+ new TopicMessageWrapper(topic, message));
+ return;
+ }
+ m_logService.log(LogService.LOG_WARNING, "RoutedMessage has no
available target members left");
+ // send dropped message to avoid blocking
+ }
+ else {
+ m_clusterMemberService.broadcast(new TopicMessageWrapper(topic,
message));
+ }
}
- public void subscribe(ClusterTopicListener clusterTopicListener) {
+ public void subscribe(final ClusterTopicListener clusterTopicListener) {
if (clusterTopicListener.getTopic() == null ||
clusterTopicListener.getTopic().equals("")) {
return;
}
- synchronized (m_clusterTopicListeners) {
+ m_clusterTopicListenersLock.writeLock().lock();
+ try {
if
(!m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) {
m_clusterTopicListeners.put(clusterTopicListener.getTopic(),
new HashSet<ClusterTopicListener>());
}
m_clusterTopicListeners.get(clusterTopicListener.getTopic()).add(clusterTopicListener);
}
+ finally {
+ m_clusterTopicListenersLock.writeLock().unlock();
+ }
}
public void unsubscribe(ClusterTopicListener clusterTopicListener) {
if (clusterTopicListener.getTopic() == null ||
clusterTopicListener.getTopic().equals("")) {
return;
}
- synchronized (m_clusterTopicListeners) {
+ m_clusterTopicListenersLock.writeLock().lock();
+ try {
if
(m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) {
m_clusterTopicListeners.get(clusterTopicListener.getTopic()).remove(clusterTopicListener);
+ if
(m_clusterTopicListeners.get(clusterTopicListener.getTopic()).size() == 0) {
+
m_clusterTopicListeners.remove(clusterTopicListener.getTopic());
+ }
}
}
+ finally {
+ m_clusterTopicListenersLock.writeLock().unlock();
+ }
}
/********************************************************
@@ -93,16 +150,38 @@
public void recieveMessage(Object message) {
if (message instanceof TopicMessageWrapper) {
TopicMessageWrapper topicMessageWrapper = (TopicMessageWrapper)
message;
- synchronized (m_clusterTopicListeners) {
+ m_clusterTopicListenersLock.readLock().lock();
+ try {
if
(m_clusterTopicListeners.containsKey(topicMessageWrapper.getTopic())) {
- // TODO async
for (ClusterTopicListener clusterTopicListener :
m_clusterTopicListeners.get(topicMessageWrapper
.getTopic())) {
- // TODO clone?
-
clusterTopicListener.recieveMessage(topicMessageWrapper.getMessage());
+ m_executorService.submit(new
MessageDispatchRunnable(clusterTopicListener, topicMessageWrapper
+ .getMessage()));
}
}
}
+ finally {
+ m_clusterTopicListenersLock.readLock().unlock();
+ }
+ }
+ }
+
+ /********************************************************
+ * helper classes
+ ********************************************************/
+
+ static class MessageDispatchRunnable implements Runnable {
+
+ private final ClusterTopicListener m_clusterTopicListener;
+ private final Object m_message;
+
+ public MessageDispatchRunnable(final ClusterTopicListener
clusterTopicListener, final Object message) {
+ m_clusterTopicListener = clusterTopicListener;
+ m_message = message;
+ }
+
+ public void run() {
+ m_clusterTopicListener.recieveMessage(m_message);
}
}
}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/ClusterMemberServiceImpl.java
Tue Dec 21 18:33:02 2010
@@ -21,8 +21,14 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
import org.amdatu.core.fabric.cluster.service.AbstractClusterMemberService;
@@ -32,14 +38,16 @@
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.util.UUIDGenerator;
public final class ClusterMemberServiceImpl extends
AbstractClusterMemberService implements MembershipListener,
ChannelListener {
public static final String CLUSTER_TRIBES_ARGS_PROP =
"org.amdatu.fabric.cluster.tribes.args";
+ private final Map<ClusterMember, Member> m_clusterMemberMembers = new
HashMap<ClusterMember, Member>();
+
+ private final ReentrantReadWriteLock m_clusterMemberMembersLock = new
ReentrantReadWriteLock();
+
private ManagedChannel m_managedChannel;
/********************************************************
@@ -61,14 +69,12 @@
CLUSTER_TRIBES_ARGS_PROP));
Properties props = new Properties();
- props.setProperty(CLUSTER_MEMBERID_PROP,
Arrays.toString(UUIDGenerator.randomUUID(true)));
+ props.setProperty(CLUSTER_MEMBERID_PROP, getMemberId());
m_managedChannel.addMembershipListener(this);
m_managedChannel.addChannelListener(this);
m_managedChannel.getMembershipService().setPayload(getPayload(props));
-//
m_managedChannel.getMembershipService().setDomain("Amdatu".getBytes(Charset.forName("ISO-8859-1")));
-
m_managedChannel.start(Channel.DEFAULT);
}
catch (Exception e) {
@@ -105,15 +111,46 @@
}
}
+ @Override
+ public void doSend(ClusterMember[] clusterMembers, Object message) {
+ // TODO check and wrap message. Look into send options
+ if (message instanceof Serializable) {
+ try {
+ List<Member> members = new LinkedList<Member>();
+ synchronized (m_clusterMemberMembers) {
+ for (ClusterMember clusterMember : clusterMembers) {
+ Member member =
m_clusterMemberMembers.get(clusterMember);
+ if (member != null) {
+ members.add(member);
+ }
+ }
+ }
+ if (members.size() > 0)
+ m_managedChannel.send(members.toArray(new
Member[members.size()]), (Serializable) message,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
+ catch (ChannelException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
/********************************************************
* MembershipListener
********************************************************/
public void memberAdded(Member member) {
try {
- System.out.println("Received member added:" + member);
- addClusterMember(new
ClusterMemberImpl(getProperties(member.getPayload())
- .getProperty(CLUSTER_MEMBERID_PROP)));
+ ClusterMember clusterMember = new
ClusterMemberImpl(getProperties(member.getPayload())
+ .getProperty(CLUSTER_MEMBERID_PROP));
+ m_clusterMemberMembersLock.writeLock().lock();
+ try {
+ m_clusterMemberMembers.put(clusterMember, member);
+ }
+ finally {
+ m_clusterMemberMembersLock.writeLock().unlock();
+ }
+ addClusterMember(clusterMember);
}
catch (Exception x) {
x.printStackTrace();
@@ -122,9 +159,26 @@
public void memberDisappeared(Member member) {
try {
- System.out.println("Received member disappeared:" + member);
- removeClusterMember(new
ClusterMemberImpl(getProperties(member.getPayload())
- .getProperty(CLUSTER_MEMBERID_PROP)));
+ // FIXME use memberid to tuple map to reduce object creation
+ String memberId = getProperties(member.getPayload())
+ .getProperty(CLUSTER_MEMBERID_PROP);
+ ClusterMember toBeRemoved = null;
+ m_clusterMemberMembersLock.writeLock().lock();
+ try {
+ for (ClusterMember clusterMember :
m_clusterMemberMembers.keySet()) {
+ if (clusterMember.getId().equals(memberId)) {
+ toBeRemoved = clusterMember;
+ break;
+ }
+ }
+ if (toBeRemoved != null) {
+ m_clusterMemberMembers.remove(toBeRemoved);
+ }
+ }
+ finally {
+ m_clusterMemberMembersLock.writeLock().unlock();
+ }
+ removeClusterMember(toBeRemoved);
}
catch (Exception x) {
x.printStackTrace();
@@ -159,4 +213,23 @@
props.load(bin);
return props;
}
+
+ static class ClusterMemberMemberTuple {
+
+ private final ClusterMember m_clusterMember;
+ private final Member m_member;
+
+ public ClusterMemberMemberTuple(final ClusterMember clusterMember,
final Member member) {
+ m_clusterMember = clusterMember;
+ m_member = member;
+ }
+
+ public ClusterMember getClusterMember() {
+ return m_clusterMember;
+ }
+
+ public Member getMember() {
+ return m_member;
+ }
+ }
}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java
Tue Dec 21 18:33:02 2010
@@ -33,10 +33,6 @@
import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl;
import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
import org.amdatu.core.fabric.remote.shell.DiscoveryListCommand;
-import org.amdatu.core.fabric.test.FabricTestService;
-import org.amdatu.core.fabric.test.RemotableService;
-import org.amdatu.core.fabric.test.service.FabricTestServiceImpl;
-import org.amdatu.core.fabric.test.shell.FabricTestCommand;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
import org.apache.felix.shell.Command;
@@ -115,26 +111,7 @@
.setRequired(false))
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
- // Test & Shell
-
- manager.add(
- createComponent()
- .setImplementation(new FabricTestServiceImpl())
- .setInterface(FabricTestService.class.getName(), null)
-
.add(createServiceDependency().setService(LogService.class).setRequired(false)));
-
- manager.add(
- createComponent()
- .setImplementation(new FabricTestCommand())
- .setInterface(Command.class.getName(), null)
- .add(
-
createServiceDependency().setService(FabricTestService.class).setRequired(true))
- .add(
-
createServiceDependency().setService(RemotableService.class).setRequired(false)
- .setCallbacks("remotableServiceAdded",
"remotableServiceRemoved"))
- .add(
-
createServiceDependency().setService(LogService.class).setRequired(false)
- ));
+ // Shell
manager.add(
createComponent()
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java
Tue Dec 21 18:33:02 2010
@@ -38,7 +38,7 @@
final static String SERVICE_CONFIGURATION_TYPE = "org.amdatu.core.fabric";
final static String SERVICE_EXPORTED_CONFIGS_PROP =
"service.exported.configs";
- final static String SERVICE_EXPORTED_INTENTS_PROP =
"service.exported.intent";
+ final static String SERVICE_EXPORTED_INTENTS_PROP =
"service.exported.intents";
final static String SERVICE_EXPORTED_INTENTS_EXTRA_PROP =
"service.exported.intents.extra";
final static String SERVICE_EXPORTED_INTERFACES_PROP =
"service.exported.iterfaces";
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
Tue Dec 21 18:33:02 2010
@@ -26,7 +26,6 @@
private String m_memberId;
private String[] m_objectClass;
private long m_originalServiceId;
- private long m_localServiceId;
private Hashtable<String, Object> m_properties;
public ServiceEndPoint() {
@@ -64,14 +63,6 @@
m_originalServiceId = originalServiceId;
}
- public long getLocalServiceId() {
- return m_localServiceId;
- }
-
- public void setLocalServiceId(long localServiceId) {
- m_localServiceId = localServiceId;
- }
-
public Hashtable<String, Object> getProperties() {
return m_properties;
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
Tue Dec 21 18:33:02 2010
@@ -19,14 +19,16 @@
import java.io.Serializable;
import java.util.Map;
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointInvokeMessage implements Serializable {
+public class EndpointInvokeMessage extends RoutedMessage implements
Serializable {
private ServiceEndPoint m_serviceEndPoint;
private Map<String, Object> m_payload;
public EndpointInvokeMessage(ServiceEndPoint serviceEndPoint, Map<String,
Object> payload) {
+ super(serviceEndPoint.getClusterId(), serviceEndPoint.getMemberId());
m_serviceEndPoint = serviceEndPoint;
m_payload = payload;
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
Tue Dec 21 18:33:02 2010
@@ -19,22 +19,19 @@
import java.io.Serializable;
import java.util.Map;
+import org.amdatu.core.fabric.cluster.internal.RoutedMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointResponseMessage implements Serializable {
+public class EndpointResponseMessage extends RoutedMessage implements
Serializable {
- private ServiceEndPoint m_serviceEndPoint;
private Map<String, Object> m_payload;
- public EndpointResponseMessage(ServiceEndPoint serviceEndPoint,
Map<String, Object> payload) {
- m_serviceEndPoint = serviceEndPoint;
+ public EndpointResponseMessage(final String originClusterId, final String
originMemberId,
+ final Map<String, Object> payload) {
+ super(originClusterId, originMemberId);
m_payload = payload;
}
- public ServiceEndPoint getServiceEndPoint() {
- return m_serviceEndPoint;
- }
-
public Map<String, Object> getPayload() {
return m_payload;
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
Tue Dec 21 18:33:02 2010
@@ -18,68 +18,89 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.ClusterMessageService;
import org.amdatu.core.fabric.cluster.ClusterTopicListener;
import org.amdatu.core.fabric.remote.DistributionService;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
+import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
/**
* I am a delegate to the DistributionService
* I proxy local service invocations and put them over the cluster
*/
-public class LocalServiceInvocationHandler implements InvocationHandler,
ClusterTopicListener {
+public final class LocalServiceInvocationHandler implements InvocationHandler,
ClusterTopicListener {
- private final static String MESSAGE_INVOCATION_ID_KEY = "MII";
- private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY =
"MIEI";
- private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK";
- private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK";
- private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM";
- private final static long INVOCATION_TIMEOUT = 10000;
+ private final static long INVOCATION_TIMEOUT = 100;
private final Set<String> m_invocationIdentifiers = new HashSet<String>();
- private final Map<String, Object> myInvocationResponses = new
HashMap<String, Object>();
+ private final Map<String, Object> m_invocationResponses = new
HashMap<String, Object>();
+ private final ReentrantReadWriteLock m_invocationIdentifiersLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock m_invocationResponsesLock = new
ReentrantReadWriteLock();
+
+ private volatile ClusterMemberService m_clusterMemberService;
private volatile ClusterMessageService m_clusterMessageService;
private final ServiceEndPoint m_serviceEndpoint;
private final Class<?>[] m_interfaceClasses;
+ private final Set<Method> m_interfaceMethods = new HashSet<Method>();
private String m_serviceEndpointTopic;
+ /********************************************************
+ * Constructors
+ ********************************************************/
+
public LocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint,
Class<?>[] interfaceClasses) {
m_serviceEndpoint = serviceEndpoint;
m_interfaceClasses = interfaceClasses;
m_serviceEndpointTopic = DistributionService.REMOTE_TOPIC;
+
+ for (Class<?> interfaceClass : interfaceClasses) {
+ for (Method serviceMethod : interfaceClass.getMethods()) {
+ m_interfaceMethods.add(serviceMethod);
+ }
+ }
+ }
+
+ /********************************************************
+ * Access
+ ********************************************************/
+
+ public ServiceEndPoint getServiceEndPoint() {
+ return m_serviceEndpoint;
}
- /*
- * lifecycle
- */
+ /********************************************************
+ * Service lifecycle
+ ********************************************************/
- public void start() {
+ public synchronized void start() {
m_clusterMessageService.subscribe(this);
}
- public void stop() {
+ public synchronized void stop() {
m_clusterMessageService.unsubscribe(this);
}
- public ServiceEndPoint getServiceEndPoint() {
- return m_serviceEndpoint;
- }
- /*
- * InvocationHandler interface
- */
- public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ /********************************************************
+ * InvocationHandler
+ ********************************************************/
- if (isServiceIntefaceInvocation(method)) {
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ if (isServiceInterfaceInvocation(method)) {
String invocationIdentifier = createNewInvocationIdentifier();
Map<String, Object> payload =
getInvocationPayload(invocationIdentifier, method, args);
m_clusterMessageService.publish(m_serviceEndpointTopic, new
EndpointInvokeMessage(m_serviceEndpoint,
@@ -90,9 +111,10 @@
return method.invoke(this, args);
}
- /*
- * ClusterTopicListener interface
- */
+ /********************************************************
+ * ClusterTopicListner
+ ********************************************************/
+
public String getTopic() {
return m_serviceEndpointTopic;
}
@@ -101,19 +123,44 @@
if (message instanceof EndpointResponseMessage) {
EndpointResponseMessage endpointResponseMessage =
(EndpointResponseMessage) message;
Map<String, Object> payload = endpointResponseMessage.getPayload();
- String invocationId = (String)
payload.get(MESSAGE_INVOCATION_ID_KEY);
+ String invocationId = (String)
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY);
if (ownsInvocationIndentifier(invocationId)) {
- Object response = payload.get(MESSAGE_RESPONSE_MAP_KEY);
+ Object response =
payload.get(DistributionServiceImpl.MESSAGE_INVOCATION_RESPONSE_MAP_KEY);
storeResponseObject(invocationId, response);
removeInvocationIdentifier(invocationId);
- System.err.println("Stored response: " + response);
}
}
}
- /*
+ /********************************************************
+ * Object
+ ********************************************************/
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Proxy) {
+ InvocationHandler invocationHandler =
Proxy.getInvocationHandler(obj);
+ if (invocationHandler instanceof LocalServiceInvocationHandler) {
+ return getServiceEndPoint().equals(
+ ((LocalServiceInvocationHandler)
invocationHandler).getServiceEndPoint());
+ }
+ return false;
+ }
+ if (obj instanceof LocalServiceInvocationHandler) {
+ return getServiceEndPoint().equals(
+ ((LocalServiceInvocationHandler) obj).getServiceEndPoint());
+ }
+ return super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return getServiceEndPoint().hashCode();
+ }
+
+ /********************************************************
* private
- */
+ ********************************************************/
private String createNewInvocationIdentifier() {
String invocationId = UUID.randomUUID().toString();
@@ -125,10 +172,13 @@
private Map<String, Object> getInvocationPayload(String invocationId,
Method method, Object[] args) {
Map<String, Object> payload = new HashMap<String, Object>();
- payload.put(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY,
m_serviceEndpoint);
- payload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
- payload.put(MESSAGE_INVOCATION_METHODNAME_KEY, method.getName());
- payload.put(MESSAGE_INVOCATION_ARGUMENTS_KEY, args);
+ payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ID_KEY,
invocationId);
+ payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_METHODNAME_KEY,
method.getName());
+ payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ARGUMENTS_KEY,
args);
+
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY,
+ m_clusterMemberService.getClusterId());
+
payload.put(DistributionServiceImpl.MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY,
+ m_clusterMemberService.getMemberId());
return payload;
}
@@ -141,16 +191,14 @@
if (invocationResponseRecieved(invocationId)) {
response = retrieveInvocationResponseObject(invocationId);
isResponseRecieved = true;
- System.err.println("Recieved repsonse for invocation response:
" + invocationId);
}
else {
isResponseTimedOut = isTimedOut(invocationStart);
}
}
if (isResponseTimedOut) {
- System.err.println("Timed out waiting for invocation response: " +
invocationId);
+ // FIXME now what?
}
- System.err.println("Invocation took " + (System.currentTimeMillis() -
invocationStart) + " ms");
return response;
}
@@ -158,44 +206,58 @@
return (System.currentTimeMillis() - invocationStart) >
INVOCATION_TIMEOUT;
}
- private boolean invocationResponseRecieved(String invocationId) {
- synchronized (myInvocationResponses) {
- return myInvocationResponses.containsKey(invocationId);
+ private boolean invocationResponseRecieved(final String invocationId) {
+ m_invocationResponsesLock.readLock().lock();
+ try {
+ return m_invocationResponses.containsKey(invocationId);
+ }
+ finally {
+ m_invocationResponsesLock.readLock().unlock();
}
}
private Object retrieveInvocationResponseObject(final String invocationId)
{
- synchronized (myInvocationResponses) {
- return myInvocationResponses.remove(invocationId);
+ m_invocationResponsesLock.writeLock().lock();
+ try {
+ return m_invocationResponses.remove(invocationId);
+ }
+ finally {
+ m_invocationResponsesLock.writeLock().unlock();
}
}
private boolean ownsInvocationIndentifier(final String invocationId) {
- synchronized (m_invocationIdentifiers) {
+ m_invocationIdentifiersLock.readLock().lock();
+ try {
return m_invocationIdentifiers.contains(invocationId);
}
+ finally {
+ m_invocationIdentifiersLock.readLock().unlock();
+ }
}
private void storeResponseObject(final String invocationId, final Object
payload) {
- synchronized (myInvocationResponses) {
- myInvocationResponses.put(invocationId, payload);
+ m_invocationResponsesLock.writeLock().lock();
+ try {
+ m_invocationResponses.put(invocationId, payload);
+ }
+ finally {
+ m_invocationResponsesLock.writeLock().unlock();
}
}
private void removeInvocationIdentifier(final String invocationId) {
- synchronized (m_invocationIdentifiers) {
+ m_invocationIdentifiersLock.writeLock().lock();
+ try {
m_invocationIdentifiers.remove(invocationId);
}
+ finally {
+ m_invocationIdentifiersLock.writeLock().unlock();
+ }
}
- private boolean isServiceIntefaceInvocation(Method method) {
- for (Class<?> serviceInterfaceClass : m_interfaceClasses) {
- for (Method serviceMethod : serviceInterfaceClass.getMethods()) {
- if (method.equals(serviceMethod)) {
- return true;
- }
- }
- }
- return false;
+ private boolean isServiceInterfaceInvocation(final Method method) {
+ // no lock needed
+ return m_interfaceMethods.contains(method);
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Tue Dec 21 18:33:02 2010
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMessageService;
import org.amdatu.core.fabric.cluster.ClusterTopicListener;
@@ -43,8 +44,11 @@
public class DiscoveryServiceImpl implements DiscoveryService,
ClusterTopicListener {
private final Set<ServiceEndPoint> m_remotableEndPoints = new
HashSet<ServiceEndPoint>();
+ private final ReentrantReadWriteLock m_remotableEndPointsLock = new
ReentrantReadWriteLock();
+
private final Map<ServiceEndPoint, Component> m_remoteEndPointComponents =
new HashMap<ServiceEndPoint, Component>();
+ private final ReentrantReadWriteLock m_remoteEndPointComponentsLock = new
ReentrantReadWriteLock();
private volatile ClusterMessageService m_clusterMessageService;
private volatile DependencyManager m_dependencyManager;
@@ -80,7 +84,8 @@
ServiceReference serviceReference, Object
remotableServiceEndpointObject) {
RemotableServiceEndpoint remotableServiceEndpoint =
(RemotableServiceEndpoint) remotableServiceEndpointObject;
ServiceEndPoint serviceEndPoint =
remotableServiceEndpoint.getServiceEndPoint();
- synchronized (m_remotableEndPoints) {
+ m_remotableEndPointsLock.writeLock().lock();
+ try {
if (!m_remotableEndPoints.contains(serviceEndPoint)) {
m_remotableEndPoints.add(serviceEndPoint);
m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointPublishMessage(serviceEndPoint));
@@ -88,7 +93,9 @@
else {
throw new IllegalStateException("Unexpected state... needs
analysis");
}
-
+ }
+ finally {
+ m_remotableEndPointsLock.writeLock().unlock();
}
}
@@ -96,7 +103,9 @@
ServiceReference serviceReference, Object
remotableServiceEndpointObject) {
RemotableServiceEndpoint remotableServiceEndpoint =
(RemotableServiceEndpoint) remotableServiceEndpointObject;
ServiceEndPoint serviceEndPoint =
remotableServiceEndpoint.getServiceEndPoint();
- synchronized (m_remotableEndPoints) {
+
+ m_remotableEndPointsLock.writeLock().lock();
+ try {
if (m_remotableEndPoints.contains(serviceEndPoint)) {
m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointDepublishMessage(serviceEndPoint));
m_remotableEndPoints.remove(serviceEndPoint);
@@ -105,6 +114,9 @@
throw new IllegalStateException("Unexpected state... needs
analysis");
}
}
+ finally {
+ m_remotableEndPointsLock.writeLock().unlock();
+ }
}
/********************************************************
@@ -112,60 +124,80 @@
********************************************************/
public ServiceEndPoint[] getLocalServiceEndPoints() {
- synchronized (m_remotableEndPoints) {
+ m_remotableEndPointsLock.readLock().lock();
+ try {
return m_remotableEndPoints.toArray(new
ServiceEndPoint[m_remotableEndPoints.size()]);
}
+ finally {
+ m_remotableEndPointsLock.readLock().unlock();
+ }
}
public ServiceEndPoint[] getRemoteServiceEndPoints() {
- synchronized (m_remoteEndPointComponents) {
+ m_remoteEndPointComponentsLock.readLock().lock();
+ try {
return m_remoteEndPointComponents.keySet().toArray(
new
ServiceEndPoint[m_remoteEndPointComponents.keySet().size()]);
}
+ finally {
+ m_remoteEndPointComponentsLock.readLock().unlock();
+ }
}
/********************************************************
* ClusterTopicListner
********************************************************/
- // TODO handle LOOKUP
public void recieveMessage(Object message) {
if (message instanceof EndpointPublishMessage) {
EndpointPublishMessage endpointPublishMessage =
(EndpointPublishMessage) message;
ServiceEndPoint serviceEndPoint =
endpointPublishMessage.getServiceEndPoint();
- synchronized (m_remoteEndPointComponents) {
+ m_remoteEndPointComponentsLock.writeLock().lock();
+ try {
if (m_remoteEndPointComponents.containsKey(serviceEndPoint))
return;
Component serviceComponent =
m_dependencyManager.createComponent().setInterface(RemoteServiceEndPoint.class.getName(),
null)
.setImplementation(new
RemoteServiceEndPointImpl(serviceEndPoint));
- // FIXME validate that this depdendency works
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
DiscoveryService.class,
m_component.getServiceRegistration().getReference()));
m_dependencyManager.add(serviceComponent);
m_remoteEndPointComponents.put(serviceEndPoint,
serviceComponent);
}
+ finally {
+ m_remoteEndPointComponentsLock.writeLock().unlock();
+ }
return;
}
if (message instanceof EndpointDepublishMessage) {
EndpointDepublishMessage endpointDepublishMessage =
(EndpointDepublishMessage) message;
ServiceEndPoint serviceEndPoint =
endpointDepublishMessage.getServiceEndPoint();
- synchronized (m_remoteEndPointComponents) {
+ m_remoteEndPointComponentsLock.writeLock().lock();
+ try {
if (!m_remoteEndPointComponents.containsKey(serviceEndPoint))
return;
Component serviceComponent =
m_remoteEndPointComponents.remove(serviceEndPoint);
m_dependencyManager.remove(serviceComponent);
}
+ finally {
+ m_remoteEndPointComponentsLock.writeLock().unlock();
+ }
return;
}
if (message instanceof EndpointDiscoveryMessage) {
- synchronized (m_remotableEndPoints) {
+ m_remotableEndPointsLock.writeLock().lock();
+ try {
for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
m_clusterMessageService.publish(DISCOVERY_TOPIC, new
EndpointPublishMessage(serviceEndPoint));
}
}
+ finally {
+ m_remotableEndPointsLock.writeLock().unlock();
+ }
return;
}
+ throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
+ + DISCOVERY_TOPIC);
}
public String getTopic() {
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Tue Dec 21 18:33:02 2010
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.ClusterMessageService;
@@ -44,17 +45,20 @@
public class DistributionServiceImpl implements DistributionService,
ClusterTopicListener {
- private final static String MESSAGE_INVOCATION_ID_KEY = "MII";
- private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY =
"MIEI";
- private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK";
- private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK";
- private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM";
+ public final static String MESSAGE_INVOCATION_ID_KEY = "A";
+ public final static String MESSAGE_INVOCATION_METHODNAME_KEY = "B";
+ public final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "C";
+ public final static String MESSAGE_INVOCATION_RESPONSE_MAP_KEY = "D";
+ public final static String MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY = "E";
+ public final static String MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY = "F";
private final Map<ServiceEndPoint, ServiceReferenceComponentTuple>
m_serviceEndPointServiceReferenceComponents =
new HashMap<ServiceEndPoint, ServiceReferenceComponentTuple>();
+ private final ReentrantReadWriteLock
m_serviceEndPointServiceReferenceComponentsLock = new ReentrantReadWriteLock();
private final Map<ServiceEndPoint, Component> m_serviceEndPointComponents =
new HashMap<ServiceEndPoint, Component>();
+ private final ReentrantReadWriteLock m_serviceEndPointComponentsLock = new
ReentrantReadWriteLock();
private volatile ClusterMemberService m_clusterMemberService;
private volatile ClusterMessageService m_clusterMessageService;
@@ -75,6 +79,7 @@
********************************************************/
public synchronized void start() {
+ m_logService.log(LogService.LOG_WARNING, "Starting
DistributionService");
Dictionary<String, Object> distributionProps =
m_component.getServiceProperties();
distributionProps.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP,
m_clusterMemberService.getClusterId());
distributionProps.put(ClusterMemberService.CLUSTER_MEMBERID_PROP,
m_clusterMemberService.getMemberId());
@@ -87,6 +92,7 @@
}
public synchronized void stop() {
+ m_logService.log(LogService.LOG_WARNING, "Stopping
DistributionService");
m_clusterMessageService.unsubscribe(this);
}
@@ -98,7 +104,7 @@
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
if (!isServiceEndpointConfigurationSupported(serviceEndPoint)) {
m_logService
- .log(LogService.LOG_DEBUG, "Unsupported endpoint configuration
" + serviceEndPoint.toString());
+ .log(LogService.LOG_WARNING, "Unsupported ServiceEndPoint
configuration " + serviceEndPoint.toString());
return;
}
Dictionary<String, Object> distributionProperties = new
Hashtable<String, Object>();
@@ -108,28 +114,41 @@
.setImplementation(new
RemotableServiceEndPointImpl(serviceEndPoint));
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
DistributionService.class,
m_component.getServiceRegistration().getReference()));
- synchronized (m_serviceEndPointServiceReferenceComponents) {
+ m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+ try {
if
(!m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
m_serviceEndPointServiceReferenceComponents.put(serviceEndPoint,
new ServiceReferenceComponentTuple(
serviceReference, serviceComponent));
m_dependencyManager.add(serviceComponent);
+ m_logService.log(LogService.LOG_WARNING, "Added local
ServiceEndPoint: " + serviceEndPoint.toString());
}
}
+ finally {
+
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+ }
}
public void localRemotableServiceRemoved(ServiceReference serviceReference
/* , Object Service */) {
ServiceEndPoint serviceEndPoint =
serviceEndPointFromServiceReference(serviceReference);
- synchronized (m_serviceEndPointServiceReferenceComponents) {
+ m_serviceEndPointServiceReferenceComponentsLock.writeLock().lock();
+ try {
if
(m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) {
ServiceReferenceComponentTuple serviceReferenceComponentTuple =
m_serviceEndPointServiceReferenceComponents.remove(serviceEndPoint);
m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent());
+ m_logService
+ .log(LogService.LOG_WARNING, "Removed local
ServiceEndPoint: " + serviceEndPoint.toString());
}
else {
+ m_logService.log(LogService.LOG_WARNING, "Not removed unknown
local ServiceEndPoint: "
+ + serviceEndPoint.toString());
throw new IllegalStateException("Unexpected state... this
needs analysis");
}
}
+ finally {
+
m_serviceEndPointServiceReferenceComponentsLock.writeLock().unlock();
+ }
}
public void remoteServiceEndPointAdded(/* ServiceReference
serviceReference, */Object remoteServiceEndPointObject) {
@@ -138,30 +157,46 @@
Object localServiceProxy =
createLocalServiceInvocationHandler(serviceEndPoint);
if (localServiceProxy != null) {
Component localServiceComponent =
createLocalServiceComponent(serviceEndPoint, localServiceProxy);
- synchronized (m_serviceEndPointComponents) {
+ m_serviceEndPointComponentsLock.writeLock().lock();
+ try {
if (!m_serviceEndPointComponents.containsKey(serviceEndPoint))
{
m_serviceEndPointComponents.put(serviceEndPoint,
localServiceComponent);
m_dependencyManager.add(localServiceComponent);
+ m_logService.log(LogService.LOG_WARNING,
+ "Added remote ServiceEndPoint: " +
serviceEndPoint.toString());
}
else {
+ m_logService.log(LogService.LOG_WARNING,
+ "Not added duplicate remote ServiceEndPoint: " +
serviceEndPoint.toString());
throw new IllegalStateException("Unexpected state... this
needs analysis");
}
}
+ finally {
+ m_serviceEndPointComponentsLock.writeLock().unlock();
+ }
}
}
public void remoteServiceEndPointRemoved(/* ServiceReference
serviceReference, */Object remoteServiceEndPointObject) {
RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint)
remoteServiceEndPointObject;
ServiceEndPoint serviceEndPoint =
remoteServiceEndPoint.getServiceEndPoint();
- synchronized (m_serviceEndPointComponents) {
+ m_serviceEndPointComponentsLock.writeLock().lock();
+ try {
if (m_serviceEndPointComponents.containsKey(serviceEndPoint)) {
Component localServiceComponent =
m_serviceEndPointComponents.get(serviceEndPoint);
m_dependencyManager.remove(localServiceComponent);
+ m_logService.log(LogService.LOG_WARNING,
+ "Removed remote ServiceEndPoint: " +
serviceEndPoint.toString());
}
else {
+ m_logService.log(LogService.LOG_WARNING,
+ "Not removed unknown remote ServiceEndPoint: " +
serviceEndPoint.toString());
throw new IllegalStateException("Unexpected state... this
needs analysis");
}
}
+ finally {
+ m_serviceEndPointComponentsLock.writeLock().unlock();
+ }
}
/********************************************************
@@ -175,21 +210,33 @@
public void recieveMessage(Object message) {
if (message instanceof EndpointInvokeMessage) {
EndpointInvokeMessage endpointInvokeMessage =
(EndpointInvokeMessage) message;
+ ServiceEndPoint serviceEndPoint =
endpointInvokeMessage.getServiceEndPoint();
Map<String, Object> payload = endpointInvokeMessage.getPayload();
String invocationId = (String)
payload.get(MESSAGE_INVOCATION_ID_KEY);
- ServiceEndPoint serviceEndpoint = (ServiceEndPoint) payload
- .get(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY);
-
+ String originClusterId = (String)
payload.get(MESSAGE_INVOCATION_ORIGIN_CLUSTERID_KEY);
+ String originMemberId = (String)
payload.get(MESSAGE_INVOCATION_ORIGIN_MEMBERID_KEY);
String methodName = (String)
payload.get(MESSAGE_INVOCATION_METHODNAME_KEY);
Object[] args = (Object[])
payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY);
Class<?>[] types = DistributionUtilities.getTypesFromArgs(args);
Object serviceResponse = null;
- synchronized (m_serviceEndPointServiceReferenceComponents) {
+ m_serviceEndPointServiceReferenceComponentsLock.readLock().lock();
+ try {
ServiceReferenceComponentTuple serviceReferenceComponentTuple =
-
m_serviceEndPointServiceReferenceComponents.get(serviceEndpoint);
+
m_serviceEndPointServiceReferenceComponents.get(serviceEndPoint);
if (serviceReferenceComponentTuple == null) {
- // TODO local service gone.. what to do? Send back error
to prevent client from waiting...
+ if
(serviceEndPoint.getMemberId().equals(m_clusterMemberService.getMemberId())) {
+ // TODO local service gone.. what to do? Send back
error to prevent client from waiting...
+ m_logService
+ .log(
+ LogService.LOG_WARNING,
+ "Dropping EndpointInvokeMessage for unknown
ServiceEndPoint: "
+ + serviceEndPoint.toString());
+ }
+ else {
+ m_logService.log(LogService.LOG_WARNING,
+ "Dropping EndpointInvokeMessage for other
clustermember: " + serviceEndPoint.toString());
+ }
return;
}
ServiceReference serviceReference =
serviceReferenceComponentTuple.getServiceReference();
@@ -197,6 +244,11 @@
Object serviceObject =
m_bundleContext.getService(serviceReference);
if (serviceObject == null) {
// TODO local service gone.. what to do? Send back
error to prevent client from waiting...
+ m_logService
+ .log(
+ LogService.LOG_WARNING,
+ "Dropping EndpointInvokeMessage for
unavailable service: "
+ + serviceEndPoint.toString());
return;
}
Method serviceMethod =
serviceObject.getClass().getMethod(methodName, types);
@@ -206,38 +258,47 @@
}
catch (SecurityException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
+ m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
e.printStackTrace();
}
catch (NoSuchMethodException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
+ m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
e.printStackTrace();
}
catch (IllegalArgumentException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
+ m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
e.printStackTrace();
}
catch (IllegalAccessException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
+ m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
e.printStackTrace();
}
catch (InvocationTargetException e) {
// TODO its fooked.. what to do? Send back error to
prevent client from waiting...
+ m_logService.log(LogService.LOG_ERROR, "Exception during
local service invocation", e);
e.printStackTrace();
}
}
+ finally {
+
m_serviceEndPointServiceReferenceComponentsLock.readLock().unlock();
+ }
if (serviceResponse != null) {
Map<String, Object> responsePayload = new HashMap<String,
Object>();
responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
- responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, serviceResponse);
+ responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY,
serviceResponse);
m_clusterMessageService
- .publish(REMOTE_TOPIC, new
EndpointResponseMessage(serviceEndpoint, responsePayload));
+ .publish(REMOTE_TOPIC,
+ new EndpointResponseMessage(originClusterId,
originMemberId, responsePayload));
}
}
}
- /*
+ /********************************************************
* private
- */
+ ********************************************************/
private Object createLocalServiceInvocationHandler(ServiceEndPoint
serviceEndpoint) {
Class<?>[] interfaceClasses = new
Class<?>[serviceEndpoint.getObjectClass().length];
@@ -278,7 +339,8 @@
Component component = m_dependencyManager.createComponent()
.setInterface(serviceEndPoint.getObjectClass(),
registrationProperties)
.setImplementation(serviceObject);
-
+
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMemberService.class)
+ .setRequired(true));
component.add(m_dependencyManager.createServiceDependency().setService(ClusterMessageService.class)
.setRequired(true));
return component;
Modified:
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
(original)
+++
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/DistributionUtilitiesTest.java
Tue Dec 21 18:33:02 2010
@@ -130,6 +130,20 @@
"unknown_intent" });
Assert.assertFalse("Unknown intent specified... cannot be supported",
DistributionUtilities.isExportedIntentsListSupported(props));
+
+ props = new Hashtable<String, Object>();
+ props.put(DistributionService.SERVICE_INTENTS_PROP, new String[] {});
+ props.put(DistributionService.SERVICE_EXPORTED_INTENTS_PROP,
+ new String[] {
DistributionService.DISTRIBUTION_INTENT_CONFIDENTIALITY });
+ props.put(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP, new
String[] {});
+ props.put(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP,
+ new String[] { RemotableInterface1.class.getName() });
+ props.put(DistributionService.SERVICE_EXPORTED_CONFIGS_PROP,
+ new String[] { DistributionService.SERVICE_CONFIGURATION_TYPE });
+ props.put("org.amdatu.test.prop", "this property should be published
on imported services");
+ Assert.assertTrue("Unknown intent specified... cannot be supported",
+ DistributionUtilities.isExportedIntentsListSupported(props));
+
}
@Test