Author: bdekruijff at gmail.com Date: Fri Dec 17 16:41:34 2010 New Revision: 512
Log: [sandbox] Initial drop of service fabric implementation Added: sandbox/bdekruijff/fabric/ sandbox/bdekruijff/fabric/pom.xml sandbox/bdekruijff/fabric/src/ sandbox/bdekruijff/fabric/src/main/ sandbox/bdekruijff/fabric/src/main/java/ sandbox/bdekruijff/fabric/src/main/java/org/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageListener.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterTopicListener.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/AbstractClusterMemberService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMemberServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/IntrospectionUtils.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/shell/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/shell/ClusterMessageCommand.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/test/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/test/ClusterMessageTester.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemotableServiceEndpoint.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemotableServiceEndPointImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemoteServiceEndPointImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/shell/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/shell/DiscoveryListCommand.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/FabricTestService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/RemotableService.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/RemotableServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/FabricTestServiceImpl.java sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/FabricTestCommand.java sandbox/bdekruijff/fabric/src/test/ sandbox/bdekruijff/fabric/src/test/java/ Added: sandbox/bdekruijff/fabric/pom.xml ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/pom.xml Fri Dec 17 16:41:34 2010 @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.amdatu</groupId> + <artifactId>org.amdatu.core</artifactId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <groupId>org.amdatu.core</groupId> + <artifactId>fabric</artifactId> + <packaging>bundle</packaging> + <name>Amdatu Core - Service Fabric</name> + <description>Amdatu Core - Service Fabric</description> + + <dependencies> + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>tribes</artifactId> + <version>6.0.29</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>juli</artifactId> + <version>6.0.29</version> + </dependency> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.shell</artifactId> + <type>bundle</type> + <scope>provided</scope> + <version>1.4.2</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Bundle-Activator>org.amdatu.core.fabric.osgi.Activator</Bundle-Activator> + <Bundle-SymbolicName>org.amdatu.core.fabric</Bundle-SymbolicName> + <Export-Package>org.amdatu.core.fabric.cluster, org.amdatu.core.fabric.remote</Export-Package> + <Embed-Dependency>*;scope=compile</Embed-Dependency> + <Embed-Transitive>true</Embed-Transitive> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + +</project> Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,22 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster; + +public interface ClusterMember { + + String getId(); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,39 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster; + +import java.util.Dictionary; + +public interface ClusterMemberService { + + final static String CLUSTER_CLUSTERID_PROP = "org.amdatu.fabric.cluster.CLUSTERID"; + final static String CLUSTER_MEMBERID_PROP = "org.amdatu.fabric.cluster.MEMBERID"; + + String getClusterId(); + + String getMemberId(); + + Dictionary<String, Object> getProperties(); + + ClusterMember[] getClusterMembers(); + + void broadcast(Object message); + + void subscribe(ClusterMessageListener clusterMessageListener); + + void unsubscribe(ClusterMessageListener clusterMessageListener); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageListener.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageListener.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,22 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster; + +public interface ClusterMessageListener { + + void recieveMessage(Object message); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMessageService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,26 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster; + +public interface ClusterMessageService { + + void publish(String topic, Object message); + + void subscribe(ClusterTopicListener clusterTopicListener); + + void unsubscribe(ClusterTopicListener clusterTopicListener); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterTopicListener.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterTopicListener.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,24 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster; + +public interface ClusterTopicListener { + + String getTopic(); + + void recieveMessage(Object message); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/AbstractClusterMemberService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/AbstractClusterMemberService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,128 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.internal; + +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Map; +import java.util.Set; + +import org.amdatu.core.fabric.cluster.ClusterMember; +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.cluster.ClusterMessageListener; + +/** + * I manage cluster state + */ +public abstract class AbstractClusterMemberService implements ClusterMemberService { + + private final Set<ClusterMember> m_clusterMembers = new HashSet<ClusterMember>(); + + private final Set<ClusterMessageListener> m_clusterMessageListeners = new HashSet<ClusterMessageListener>(); + + private final String m_clusterId; + private final String m_memberId; + private final Map<String, Object> m_properties; + + public AbstractClusterMemberService(String clusterId, String memberId, Dictionary<String, Object> properties) { + m_clusterId = clusterId; + m_memberId = memberId; + m_properties = new HashMap<String, Object>(); + if (properties != null) { + Enumeration<String> enumeration = properties.keys(); + while (enumeration.hasMoreElements()) { + String key = (String) enumeration.nextElement(); + m_properties.put(key, properties.get(key)); + } + } + } + + /* + * ClusterMemberService interface + */ + + public final String getClusterId() { + return m_clusterId; + } + + public final String getMemberId() { + return m_memberId; + } + + public final Dictionary<String, Object> getProperties() { + return new Hashtable<String, Object>(m_properties); + + } + + public final ClusterMember[] getClusterMembers() { + // TODO defend + return m_clusterMembers.toArray(new ClusterMember[m_clusterMembers.size()]); + } + + public final void broadcast(Object message) { + doBroadcast(message); + } + + public final void subscribe(ClusterMessageListener clusterMessageListener) { + synchronized (m_clusterMessageListeners) { + m_clusterMessageListeners.add(clusterMessageListener); + } + onSubscribe(clusterMessageListener); + } + + public final void unsubscribe(ClusterMessageListener clusterMessageListener) { + synchronized (m_clusterMessageListeners) { + m_clusterMessageListeners.remove(clusterMessageListener); + } + onUnsubscribe(clusterMessageListener); + } + + /* + * SPI + */ + protected final void addClusterMember(ClusterMember clusterMember) { + synchronized (m_clusterMembers) { + m_clusterMembers.add(clusterMember); + } + } + + protected final void removeClusterMember(ClusterMember clusterMember) { + synchronized (m_clusterMembers) { + m_clusterMembers.remove(clusterMember); + } + } + + protected final void dispatchMessage(Object message) { + // TODO async + synchronized (m_clusterMessageListeners) { + for (ClusterMessageListener clm : m_clusterMessageListeners) { + clm.recieveMessage(message); + } + } + } + + protected void onSubscribe(ClusterMessageListener clusterMessageListener) { + } + + protected void onUnsubscribe(ClusterMessageListener clusterMessageListener) { + } + + public abstract void doBroadcast(Object message); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,42 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.internal; + +import org.amdatu.core.fabric.cluster.ClusterMember; + +public class ClusterMemberImpl implements ClusterMember { + + public final String m_id; + + public ClusterMemberImpl(final String id) { + m_id = id; + } + + public String getId() { + return m_id; + } + + @Override + public boolean equals(Object obj) { + return m_id.equals(((ClusterMemberImpl) obj).getId()); + } + + @Override + public int hashCode() { + return m_id.hashCode(); + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,258 @@ +package org.amdatu.core.fabric.cluster.internal.tribes; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; +import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor; +import org.apache.catalina.tribes.group.interceptors.GzipInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; +import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.membership.McastService; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.transport.MultiPointSender; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ReplicationTransmitter; + +/** + * FIXME copy of http://svn.apache.org/repos/asf/tomcat/tc7.0.x/tags/TOMCAT_7_0_5/test/org/apache/catalina/tribes/demos/ChannelCreator.java + * + */ +public class ChannelCreator { + + public static StringBuilder usage() { + StringBuilder buf = new StringBuilder(); + buf.append("\n\t\t[-bind tcpbindaddress]") + .append("\n\t\t[-tcpselto tcpselectortimeout]") + .append("\n\t\t[-tcpthreads tcpthreadcount]") + .append("\n\t\t[-port tcplistenport]") + .append("\n\t\t[-autobind tcpbindtryrange]") + .append("\n\t\t[-ackto acktimeout]") + .append( + "\n\t\t[-receiver org.apache.catalina.tribes.transport.nio.NioReceiver|org.apache.catalina.tribes.transport.bio.BioReceiver|]") + .append( + "\n\t\t[-transport org.apache.catalina.tribes.transport.nio.PooledParallelSender|org.apache.catalina.tribes.transport.bio.PooledMultiSender]") + .append("\n\t\t[-transport.xxx transport specific property]") + .append("\n\t\t[-maddr multicastaddr]") + .append("\n\t\t[-mport multicastport]") + .append("\n\t\t[-mbind multicastbindaddr]") + .append("\n\t\t[-mfreq multicastfrequency]") + .append("\n\t\t[-mdrop multicastdroptime]") + .append("\n\t\t[-gzip]") + .append("\n\t\t[-static hostname:port (-static localhost:9999 -static 127.0.0.1:8888 can be repeated)]") + .append("\n\t\t[-order]") + .append("\n\t\t[-ordersize maxorderqueuesize]") + .append("\n\t\t[-frag]") + .append("\n\t\t[-fragsize maxmsgsize]") + .append("\n\t\t[-throughput]") + .append("\n\t\t[-failuredetect]") + .append("\n\t\t[-async]") + .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]"); + return buf; + + } + + public static Channel createChannel(String[] args) throws Exception { + String bind = "auto"; + int port = 4001; + String mbind = null; + boolean gzip = false; + int tcpseltimeout = 5000; + int tcpthreadcount = 4; + int acktimeout = 15000; + String mcastaddr = "228.0.0.5"; + int mcastport = 45565; + long mcastfreq = 500; + long mcastdrop = 2000; + boolean order = false; + int ordersize = Integer.MAX_VALUE; + boolean frag = false; + int fragsize = 1024; + int autoBind = 10; + ArrayList<Member> staticMembers = new ArrayList<Member>(); + Properties transportProperties = new Properties(); + String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender"; + String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver"; + boolean async = false; + int asyncsize = 1024 * 1024 * 50; // 50MB + boolean throughput = false; + boolean failuredetect = false; + + for (int i = 0; i < args.length; i++) { + if ("-bind".equals(args[i])) { + bind = args[++i]; + } + else if ("-port".equals(args[i])) { + port = Integer.parseInt(args[++i]); + } + else if ("-autobind".equals(args[i])) { + autoBind = Integer.parseInt(args[++i]); + } + else if ("-tcpselto".equals(args[i])) { + tcpseltimeout = Integer.parseInt(args[++i]); + } + else if ("-tcpthreads".equals(args[i])) { + tcpthreadcount = Integer.parseInt(args[++i]); + } + else if ("-gzip".equals(args[i])) { + gzip = true; + } + else if ("-async".equals(args[i])) { + async = true; + } + else if ("-failuredetect".equals(args[i])) { + failuredetect = true; + } + else if ("-asyncsize".equals(args[i])) { + asyncsize = Integer.parseInt(args[++i]); + System.out.println("Setting MessageDispatchInterceptor.maxQueueSize=" + asyncsize); + } + else if ("-static".equals(args[i])) { + String d = args[++i]; + String h = d.substring(0, d.indexOf(":")); + String p = d.substring(h.length() + 1); + MemberImpl m = new MemberImpl(h, Integer.parseInt(p), 2000); + staticMembers.add(m); + } + else if ("-throughput".equals(args[i])) { + throughput = true; + } + else if ("-order".equals(args[i])) { + order = true; + } + else if ("-ordersize".equals(args[i])) { + ordersize = Integer.parseInt(args[++i]); + System.out.println("Setting OrderInterceptor.maxQueue=" + ordersize); + } + else if ("-frag".equals(args[i])) { + frag = true; + } + else if ("-fragsize".equals(args[i])) { + fragsize = Integer.parseInt(args[++i]); + System.out.println("Setting FragmentationInterceptor.maxSize=" + fragsize); + } + else if ("-ackto".equals(args[i])) { + acktimeout = Integer.parseInt(args[++i]); + } + else if ("-transport".equals(args[i])) { + transport = args[++i]; + } + else if (args[i] != null && args[i].startsWith("transport.")) { + String key = args[i]; + String val = args[++i]; + transportProperties.setProperty(key, val); + } + else if ("-receiver".equals(args[i])) { + receiver = args[++i]; + } + else if ("-maddr".equals(args[i])) { + mcastaddr = args[++i]; + } + else if ("-mport".equals(args[i])) { + mcastport = Integer.parseInt(args[++i]); + } + else if ("-mfreq".equals(args[i])) { + mcastfreq = Long.parseLong(args[++i]); + } + else if ("-mdrop".equals(args[i])) { + mcastdrop = Long.parseLong(args[++i]); + } + else if ("-mbind".equals(args[i])) { + mbind = args[++i]; + } + } + + System.out.println("Creating receiver class=" + receiver); + Class<?> cl = Class.forName(receiver, true, + ChannelCreator.class.getClassLoader()); + ReceiverBase rx = (ReceiverBase) cl.newInstance(); + rx.setAddress(bind); + rx.setPort(port); + rx.setSelectorTimeout(tcpseltimeout); + rx.setMaxThreads(tcpthreadcount); + rx.setMinThreads(tcpthreadcount); + rx.getBind(); + rx.setRxBufSize(43800); + rx.setTxBufSize(25188); + rx.setAutoBind(autoBind); + + ReplicationTransmitter ps = new ReplicationTransmitter(); + System.out.println("Creating transport class=" + transport); + MultiPointSender sender = + (MultiPointSender) Class.forName(transport, true, ChannelCreator.class.getClassLoader()).newInstance(); + sender.setTimeout(acktimeout); + sender.setMaxRetryAttempts(2); + sender.setRxBufSize(43800); + sender.setTxBufSize(25188); + + Iterator<Object> i = transportProperties.keySet().iterator(); + while (i.hasNext()) { + String key = (String) i.next(); + IntrospectionUtils.setProperty(sender, key, transportProperties.getProperty(key)); + } + ps.setTransport(sender); + + McastService service = new McastService(); + service.setAddress(mcastaddr); + if (mbind != null) + service.setMcastBindAddress(mbind); + service.setFrequency(mcastfreq); + service.setMcastDropTime(mcastdrop); + service.setPort(mcastport); + + ManagedChannel channel = new GroupChannel(); + channel.setChannelReceiver(rx); + channel.setChannelSender(ps); + channel.setMembershipService(service); + + if (throughput) + channel.addInterceptor(new ThroughputInterceptor()); + if (gzip) + channel.addInterceptor(new GzipInterceptor()); + if (frag) { + FragmentationInterceptor fi = new FragmentationInterceptor(); + fi.setMaxSize(fragsize); + channel.addInterceptor(fi); + } + if (order) { + OrderInterceptor oi = new OrderInterceptor(); + oi.setMaxQueue(ordersize); + channel.addInterceptor(oi); + } + + if (async) { + MessageDispatchInterceptor mi = new MessageDispatch15Interceptor(); + mi.setMaxQueueSize(asyncsize); + channel.addInterceptor(mi); + System.out.println("Added MessageDispatchInterceptor"); + } + + if (failuredetect) { + TcpFailureDetector tcpfi = new TcpFailureDetector(); + channel.addInterceptor(tcpfi); + } + if (staticMembers.size() > 0) { + StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); + for (int x = 0; x < staticMembers.size(); x++) { + smi.addStaticMember(staticMembers.get(x)); + } + channel.addInterceptor(smi); + } + + byte[] domain = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; + ((McastService) channel.getMembershipService()).setDomain(domain); + DomainFilterInterceptor filter = new DomainFilterInterceptor(); + filter.setDomain(domain); + channel.addInterceptor(filter); + return channel; + } +} \ No newline at end of file Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMemberServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMemberServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,155 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.internal.tribes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Dictionary; +import java.util.Properties; + +import org.amdatu.core.fabric.cluster.internal.AbstractClusterMemberService; +import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.util.UUIDGenerator; + +public final class ClusterMemberServiceImpl extends AbstractClusterMemberService implements MembershipListener, + ChannelListener { + + public static final String CLUSTER_TRIBES_ARGS_PROP = "org.amdatu.fabric.cluster.tribes.args"; + + private ManagedChannel m_managedChannel; + + public ClusterMemberServiceImpl(String clusterId, String memberId, Dictionary<String, Object> properties) { + super(clusterId, memberId, properties); + } + + /* + * lifecycle + */ + + public void start() { + try { + m_managedChannel = + (ManagedChannel) ChannelCreator.createChannel((String[]) getProperties().get( + CLUSTER_TRIBES_ARGS_PROP)); + + Properties props = new Properties(); + props.setProperty(CLUSTER_MEMBERID_PROP, Arrays.toString(UUIDGenerator.randomUUID(true))); + + m_managedChannel.addMembershipListener(this); + m_managedChannel.addChannelListener(this); + m_managedChannel.getMembershipService().setPayload(getPayload(props)); + +// m_managedChannel.getMembershipService().setDomain("Amdatu".getBytes(Charset.forName("ISO-8859-1"))); + + m_managedChannel.start(Channel.DEFAULT); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + public void stop() { + try { + m_managedChannel.stop(Channel.DEFAULT); + } + catch (Exception x) { + x.printStackTrace(); + } + } + + /* + * ClusterMemberService interface + */ + + @Override + public void doBroadcast(Object message) { + // TODO check and wrap message. Look into send options + if (message instanceof Serializable) { + try { + m_managedChannel.send(m_managedChannel.getMembers(), (Serializable) message, + Channel.SEND_OPTIONS_ASYNCHRONOUS); + } + catch (ChannelException e) { + e.printStackTrace(); + } + } + } + + /* + * MembershipListener interface + */ + + public void memberAdded(Member member) { + try { + System.out.println("Received member added:" + member); + addClusterMember(new ClusterMemberImpl(getProperties(member.getPayload()) + .getProperty(CLUSTER_MEMBERID_PROP))); + } + catch (Exception x) { + x.printStackTrace(); + } + } + + public void memberDisappeared(Member member) { + try { + System.out.println("Received member disappeared:" + member); + removeClusterMember(new ClusterMemberImpl(getProperties(member.getPayload()) + .getProperty(CLUSTER_MEMBERID_PROP))); + } + catch (Exception x) { + x.printStackTrace(); + } + } + + /* + * ChannelListener interface + */ + + public boolean accept(Serializable message, Member arg1) { + return true; + } + + public void messageReceived(Serializable message, Member member) { + dispatchMessage(message); + } + + /* + * private methods + */ + + private byte[] getPayload(Properties props) throws IOException { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + props.store(bout, ""); + return bout.toByteArray(); + } + + private Properties getProperties(byte[] payload) throws IOException { + ByteArrayInputStream bin = new ByteArrayInputStream(payload); + Properties props = new Properties(); + props.load(bin); + return props; + } +} \ No newline at end of file Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterMessageServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,130 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.internal.tribes; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.cluster.ClusterMessageListener; +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.ClusterTopicListener; + +public class ClusterMessageServiceImpl implements ClusterMessageService { + + private final Map<String, Set<ClusterTopicListener>> m_clusterTopicListeners = + new HashMap<String, Set<ClusterTopicListener>>(); + + // injected + private volatile ClusterMemberService m_clusterMemberService; + + private ClusterMessageListener m_clusterMessageListener; + + /* + * service lifecycle + */ + + public synchronized void start() { + m_clusterMessageListener = new ClusterMessageListener() { + public void recieveMessage(Object message) { + if (message instanceof TopicMessageWrapper) { + dispatchMessages(((TopicMessageWrapper) message).getTopic(), + ((TopicMessageWrapper) message).getMessage()); + } + } + }; + m_clusterMemberService.subscribe(m_clusterMessageListener); + } + + public synchronized void stop() { + m_clusterMemberService.unsubscribe(m_clusterMessageListener); + } + + /* + * API + */ + + public void publish(String topic, Object message) { + m_clusterMemberService.broadcast(new TopicMessageWrapper(topic, message)); + } + + public void subscribe(ClusterTopicListener clusterTopicListener) { + if (clusterTopicListener.getTopic() == null || clusterTopicListener.getTopic().equals("")) { + return; + } + synchronized (m_clusterTopicListeners) { + if (!m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) { + m_clusterTopicListeners.put(clusterTopicListener.getTopic(), new HashSet<ClusterTopicListener>()); + } + m_clusterTopicListeners.get(clusterTopicListener.getTopic()).add(clusterTopicListener); + } + } + + public void unsubscribe(ClusterTopicListener clusterTopicListener) { + if (clusterTopicListener.getTopic() == null || clusterTopicListener.getTopic().equals("")) { + return; + } + synchronized (m_clusterTopicListeners) { + if (m_clusterTopicListeners.containsKey(clusterTopicListener.getTopic())) { + m_clusterTopicListeners.get(clusterTopicListener.getTopic()).remove(clusterTopicListener); + } + } + } + + /* + * private + */ + + private void dispatchMessages(String topic, Object message) { + if (topic == null || topic.equals("")) { + return; + } + synchronized (m_clusterTopicListeners) { + if (m_clusterTopicListeners.containsKey(topic)) { + // TODO async + for (ClusterTopicListener clusterTopicListener : m_clusterTopicListeners.get(topic)) { + // TODO clone? + clusterTopicListener.recieveMessage(message); + } + } + } + } +} + +class TopicMessageWrapper implements Serializable { + + private static final long serialVersionUID = 5182436896635654588L; + + private String m_topic; + private Object m_message; + + public TopicMessageWrapper(String topic, Object message) { + m_topic = topic; + m_message = message; + } + + public String getTopic() { + return m_topic; + } + + public Object getMessage() { + return m_message; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/IntrospectionUtils.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/IntrospectionUtils.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,924 @@ +package org.amdatu.core.fabric.cluster.internal.tribes; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.Hashtable; +import java.util.StringTokenizer; +import java.util.Vector; + +/** + * FIXME copy of http://svn.apache.org/repos/asf/tomcat/tc7.0.x/tags/TOMCAT_7_0_5/test/org/apache/catalina/tribes/demos/IntrospectionUtils.java + * + */ +public final class IntrospectionUtils { + + private static final org.apache.juli.logging.Log log = + org.apache.juli.logging.LogFactory.getLog(IntrospectionUtils.class); + + /** + * Call execute() - any ant-like task should work + */ + public static void execute(Object proxy, String method) throws Exception { + Method executeM = null; + Class<?> c = proxy.getClass(); + Class<?> params[] = new Class[0]; + // params[0]=args.getClass(); + executeM = findMethod(c, method, params); + if (executeM == null) { + throw new RuntimeException("No execute in " + proxy.getClass()); + } + executeM.invoke(proxy, (Object[]) null);// new Object[] { args }); + } + + /** + * Call void setAttribute( String ,Object ) + */ + public static void setAttribute(Object proxy, String n, Object v) + throws Exception { + if (proxy instanceof AttributeHolder) { + ((AttributeHolder) proxy).setAttribute(n, v); + return; + } + + Method executeM = null; + Class<?> c = proxy.getClass(); + Class<?> params[] = new Class[2]; + params[0] = String.class; + params[1] = Object.class; + executeM = findMethod(c, "setAttribute", params); + if (executeM == null) { + if (log.isDebugEnabled()) + log.debug("No setAttribute in " + proxy.getClass()); + return; + } + if (log.isDebugEnabled()) + log.debug("Setting " + n + "=" + v + " in " + proxy); + executeM.invoke(proxy, new Object[] { n, v }); + return; + } + + /** + * Call void getAttribute( String ) + */ + public static Object getAttribute(Object proxy, String n) throws Exception { + Method executeM = null; + Class<?> c = proxy.getClass(); + Class<?> params[] = new Class[1]; + params[0] = String.class; + executeM = findMethod(c, "getAttribute", params); + if (executeM == null) { + if (log.isDebugEnabled()) + log.debug("No getAttribute in " + proxy.getClass()); + return null; + } + return executeM.invoke(proxy, new Object[] { n }); + } + + /** + * Construct a URLClassLoader. Will compile and work in JDK1.1 too. + */ + public static ClassLoader getURLClassLoader(URL urls[], ClassLoader parent) { + try { + Class<?> urlCL = Class.forName("java.net.URLClassLoader"); + Class<?> paramT[] = new Class[2]; + paramT[0] = urls.getClass(); + paramT[1] = ClassLoader.class; + Method m = findMethod(urlCL, "newInstance", paramT); + if (m == null) + return null; + + ClassLoader cl = (ClassLoader) m.invoke(urlCL, new Object[] { urls, + parent }); + return cl; + } + catch (ClassNotFoundException ex) { + // jdk1.1 + return null; + } + catch (Exception ex) { + ex.printStackTrace(); + return null; + } + } + + public static String guessInstall(String installSysProp, + String homeSysProp, String jarName) { + return guessInstall(installSysProp, homeSysProp, jarName, null); + } + + /** + * Guess a product install/home by analyzing the class path. It works for + * product using the pattern: lib/executable.jar or if executable.jar is + * included in classpath by a shell script. ( java -jar also works ) + * + * Insures both "install" and "home" System properties are set. If either or + * both System properties are unset, "install" and "home" will be set to the + * same value. This value will be the other System property that is set, or + * the guessed value if neither is set. + */ + public static String guessInstall(String installSysProp, + String homeSysProp, String jarName, String classFile) { + String install = null; + String home = null; + + if (installSysProp != null) + install = System.getProperty(installSysProp); + + if (homeSysProp != null) + home = System.getProperty(homeSysProp); + + if (install != null) { + if (home == null) + System.getProperties().put(homeSysProp, install); + return install; + } + + // Find the directory where jarName.jar is located + + String cpath = System.getProperty("java.class.path"); + String pathSep = System.getProperty("path.separator"); + StringTokenizer st = new StringTokenizer(cpath, pathSep); + while (st.hasMoreTokens()) { + String path = st.nextToken(); + // log( "path " + path ); + if (path.endsWith(jarName)) { + home = path.substring(0, path.length() - jarName.length()); + try { + if ("".equals(home)) { + home = new File("./").getCanonicalPath(); + } + else if (home.endsWith(File.separator)) { + home = home.substring(0, home.length() - 1); + } + File f = new File(home); + String parentDir = f.getParent(); + if (parentDir == null) + parentDir = home; // unix style + File f1 = new File(parentDir); + install = f1.getCanonicalPath(); + if (installSysProp != null) + System.getProperties().put(installSysProp, install); + if (home == null && homeSysProp != null) + System.getProperties().put(homeSysProp, install); + return install; + } + catch (Exception ex) { + ex.printStackTrace(); + } + } + else { + String fname = path + (path.endsWith("/") ? "" : "/") + + classFile; + if (new File(fname).exists()) { + try { + File f = new File(path); + String parentDir = f.getParent(); + if (parentDir == null) + parentDir = path; // unix style + File f1 = new File(parentDir); + install = f1.getCanonicalPath(); + if (installSysProp != null) + System.getProperties().put(installSysProp, install); + if (home == null && homeSysProp != null) + System.getProperties().put(homeSysProp, install); + return install; + } + catch (Exception ex) { + ex.printStackTrace(); + } + } + } + } + + // if install directory can't be found, use home as the default + if (home != null) { + System.getProperties().put(installSysProp, home); + return home; + } + + return null; + } + + /** + * Debug method, display the classpath + */ + public static void displayClassPath(String msg, URL[] cp) { + if (log.isDebugEnabled()) { + log.debug(msg); + for (int i = 0; i < cp.length; i++) { + log.debug(cp[i].getFile()); + } + } + } + + public static final String PATH_SEPARATOR = + System.getProperty("path.separator"); + + /** + * Adds classpath entries from a vector of URL's to the "tc_path_add" System + * property. This System property lists the classpath entries common to web + * applications. This System property is currently used by Jasper when its + * JSP servlet compiles the Java file for a JSP. + */ + public static String classPathAdd(URL urls[], String cp) { + if (urls == null) + return cp; + + for (int i = 0; i < urls.length; i++) { + if (cp != null) + cp += PATH_SEPARATOR + urls[i].getFile(); + else + cp = urls[i].getFile(); + } + return cp; + } + + /** + * Find a method with the right name If found, call the method ( if param is + * int or boolean we'll convert value to the right type before) - that means + * you can have setDebug(1). + */ + public static boolean setProperty(Object o, String name, String value) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: setProperty(" + + o.getClass() + " " + name + "=" + value + ")"); + + String setter = "set" + capitalize(name); + + try { + Method methods[] = findMethods(o.getClass()); + Method setPropertyMethodVoid = null; + Method setPropertyMethodBool = null; + + // First, the ideal case - a setFoo( String ) method + for (int i = 0; i < methods.length; i++) { + Class<?> paramT[] = methods[i].getParameterTypes(); + if (setter.equals(methods[i].getName()) && paramT.length == 1 + && "java.lang.String".equals(paramT[0].getName())) { + + methods[i].invoke(o, new Object[] { value }); + return true; + } + } + + // Try a setFoo ( int ) or ( boolean ) + for (int i = 0; i < methods.length; i++) { + boolean ok = true; + if (setter.equals(methods[i].getName()) + && methods[i].getParameterTypes().length == 1) { + + // match - find the type and invoke it + Class<?> paramType = methods[i].getParameterTypes()[0]; + Object params[] = new Object[1]; + + // Try a setFoo ( int ) + if ("java.lang.Integer".equals(paramType.getName()) + || "int".equals(paramType.getName())) { + try { + params[0] = new Integer(value); + } + catch (NumberFormatException ex) { + ok = false; + } + // Try a setFoo ( long ) + } + else if ("java.lang.Long".equals(paramType.getName()) + || "long".equals(paramType.getName())) { + try { + params[0] = new Long(value); + } + catch (NumberFormatException ex) { + ok = false; + } + + // Try a setFoo ( boolean ) + } + else if ("java.lang.Boolean".equals(paramType.getName()) + || "boolean".equals(paramType.getName())) { + params[0] = new Boolean(value); + + // Try a setFoo ( InetAddress ) + } + else if ("java.net.InetAddress".equals(paramType + .getName())) { + try { + params[0] = InetAddress.getByName(value); + } + catch (UnknownHostException exc) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Unable to resolve host name:" + value); + ok = false; + } + + // Unknown type + } + else { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Unknown type " + + paramType.getName()); + } + + if (ok) { + methods[i].invoke(o, params); + return true; + } + } + + // save "setProperty" for later + if ("setProperty".equals(methods[i].getName())) { + if (methods[i].getReturnType() == Boolean.TYPE) { + setPropertyMethodBool = methods[i]; + } + else { + setPropertyMethodVoid = methods[i]; + } + + } + } + + // Ok, no setXXX found, try a setProperty("name", "value") + if (setPropertyMethodBool != null || setPropertyMethodVoid != null) { + Object params[] = new Object[2]; + params[0] = name; + params[1] = value; + if (setPropertyMethodBool != null) { + try { + return (Boolean) setPropertyMethodBool.invoke(o, params); + } + catch (IllegalArgumentException biae) { + // the boolean method had the wrong + // parameter types. lets try the other + if (setPropertyMethodVoid != null) { + setPropertyMethodVoid.invoke(o, params); + return true; + } + else { + throw biae; + } + } + } + else { + setPropertyMethodVoid.invoke(o, params); + return true; + } + } + + } + catch (IllegalArgumentException ex2) { + log.warn("IAE " + o + " " + name + " " + value, ex2); + } + catch (SecurityException ex1) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: SecurityException for " + + o.getClass() + " " + name + "=" + value + ")", ex1); + } + catch (IllegalAccessException iae) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: IllegalAccessException for " + + o.getClass() + " " + name + "=" + value + ")", iae); + } + catch (InvocationTargetException ie) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: InvocationTargetException for " + + o.getClass() + " " + name + "=" + value + ")", ie); + } + return false; + } + + public static Object getProperty(Object o, String name) { + String getter = "get" + capitalize(name); + String isGetter = "is" + capitalize(name); + + try { + Method methods[] = findMethods(o.getClass()); + Method getPropertyMethod = null; + + // First, the ideal case - a getFoo() method + for (int i = 0; i < methods.length; i++) { + Class<?> paramT[] = methods[i].getParameterTypes(); + if (getter.equals(methods[i].getName()) && paramT.length == 0) { + return methods[i].invoke(o, (Object[]) null); + } + if (isGetter.equals(methods[i].getName()) && paramT.length == 0) { + return methods[i].invoke(o, (Object[]) null); + } + + if ("getProperty".equals(methods[i].getName())) { + getPropertyMethod = methods[i]; + } + } + + // Ok, no setXXX found, try a getProperty("name") + if (getPropertyMethod != null) { + Object params[] = new Object[1]; + params[0] = name; + return getPropertyMethod.invoke(o, params); + } + + } + catch (IllegalArgumentException ex2) { + log.warn("IAE " + o + " " + name, ex2); + } + catch (SecurityException ex1) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: SecurityException for " + + o.getClass() + " " + name + ")", ex1); + } + catch (IllegalAccessException iae) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: IllegalAccessException for " + + o.getClass() + " " + name + ")", iae); + } + catch (InvocationTargetException ie) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: InvocationTargetException for " + + o.getClass() + " " + name + ")"); + } + return null; + } + + /** + */ + public static void setProperty(Object o, String name) { + String setter = "set" + capitalize(name); + try { + Method methods[] = findMethods(o.getClass()); + // find setFoo() method + for (int i = 0; i < methods.length; i++) { + Class<?> paramT[] = methods[i].getParameterTypes(); + if (setter.equals(methods[i].getName()) && paramT.length == 0) { + methods[i].invoke(o, new Object[] {}); + return; + } + } + } + catch (Exception ex1) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Exception for " + + o.getClass() + " " + name, ex1); + } + } + + /** + * Replace ${NAME} with the property value + */ + public static String replaceProperties(String value, + Hashtable<Object, Object> staticProp, PropertySource dynamicProp[]) { + if (value.indexOf("$") < 0) { + return value; + } + StringBuilder sb = new StringBuilder(); + int prev = 0; + // assert value!=nil + int pos; + while ((pos = value.indexOf("$", prev)) >= 0) { + if (pos > 0) { + sb.append(value.substring(prev, pos)); + } + if (pos == (value.length() - 1)) { + sb.append('$'); + prev = pos + 1; + } + else if (value.charAt(pos + 1) != '{') { + sb.append('$'); + prev = pos + 1; // XXX + } + else { + int endName = value.indexOf('}', pos); + if (endName < 0) { + sb.append(value.substring(pos)); + prev = value.length(); + continue; + } + String n = value.substring(pos + 2, endName); + String v = null; + if (staticProp != null) { + v = (String) staticProp.get(n); + } + if (v == null && dynamicProp != null) { + for (int i = 0; i < dynamicProp.length; i++) { + v = dynamicProp[i].getProperty(n); + if (v != null) { + break; + } + } + } + if (v == null) + v = "${" + n + "}"; + + sb.append(v); + prev = endName + 1; + } + } + if (prev < value.length()) + sb.append(value.substring(prev)); + return sb.toString(); + } + + /** + * Reverse of Introspector.decapitalize + */ + public static String capitalize(String name) { + if (name == null || name.length() == 0) { + return name; + } + char chars[] = name.toCharArray(); + chars[0] = Character.toUpperCase(chars[0]); + return new String(chars); + } + + public static String unCapitalize(String name) { + if (name == null || name.length() == 0) { + return name; + } + char chars[] = name.toCharArray(); + chars[0] = Character.toLowerCase(chars[0]); + return new String(chars); + } + + // -------------------- Class path tools -------------------- + + /** + * Add all the jar files in a dir to the classpath, represented as a Vector + * of URLs. + */ + public static void addToClassPath(Vector<URL> cpV, String dir) { + try { + String cpComp[] = getFilesByExt(dir, ".jar"); + if (cpComp != null) { + int jarCount = cpComp.length; + for (int i = 0; i < jarCount; i++) { + URL url = getURL(dir, cpComp[i]); + if (url != null) + cpV.addElement(url); + } + } + } + catch (Exception ex) { + ex.printStackTrace(); + } + } + + public static void addToolsJar(Vector<URL> v) { + try { + // Add tools.jar in any case + File f = new File(System.getProperty("java.home") + + "/../lib/tools.jar"); + + if (!f.exists()) { + // On some systems java.home gets set to the root of jdk. + // That's a bug, but we can work around and be nice. + f = new File(System.getProperty("java.home") + "/lib/tools.jar"); + if (f.exists()) { + if (log.isDebugEnabled()) + log.debug("Detected strange java.home value " + + System.getProperty("java.home") + + ", it should point to jre"); + } + } + URL url = new URL("file", "", f.getAbsolutePath()); + + v.addElement(url); + } + catch (MalformedURLException ex) { + ex.printStackTrace(); + } + } + + /** + * Return all files with a given extension in a dir + */ + public static String[] getFilesByExt(String ld, String ext) { + File dir = new File(ld); + String[] names = null; + final String lext = ext; + if (dir.isDirectory()) { + names = dir.list(new FilenameFilter() { + public boolean accept(File d, String name) { + if (name.endsWith(lext)) { + return true; + } + return false; + } + }); + } + return names; + } + + /** + * Construct a file url from a file, using a base dir + */ + public static URL getURL(String base, String file) { + try { + File baseF = new File(base); + File f = new File(baseF, file); + String path = f.getCanonicalPath(); + if (f.isDirectory()) { + path += "/"; + } + if (!f.exists()) + return null; + return new URL("file", "", path); + } + catch (Exception ex) { + ex.printStackTrace(); + return null; + } + } + + /** + * Add elements from the classpath <i>cp </i> to a Vector <i>jars </i> as + * file URLs (We use Vector for JDK 1.1 compat). + * <p> + * + * @param jars The jar list + * @param cp a String classpath of directory or jar file elements + * separated by path.separator delimiters. + * @throws IOException If an I/O error occurs + * @throws MalformedURLException Doh ;) + */ + public static void addJarsFromClassPath(Vector<URL> jars, String cp) + throws IOException, MalformedURLException { + String sep = System.getProperty("path.separator"); + StringTokenizer st; + if (cp != null) { + st = new StringTokenizer(cp, sep); + while (st.hasMoreTokens()) { + File f = new File(st.nextToken()); + String path = f.getCanonicalPath(); + if (f.isDirectory()) { + path += "/"; + } + URL url = new URL("file", "", path); + if (!jars.contains(url)) { + jars.addElement(url); + } + } + } + } + + /** + * Return a URL[] that can be used to construct a class loader + */ + public static URL[] getClassPath(Vector<URL> v) { + URL[] urls = new URL[v.size()]; + for (int i = 0; i < v.size(); i++) { + urls[i] = v.elementAt(i); + } + return urls; + } + + /** + * Construct a URL classpath from files in a directory, a cpath property, + * and tools.jar. + */ + public static URL[] getClassPath(String dir, String cpath, + String cpathProp, boolean addTools) throws IOException, + MalformedURLException { + Vector<URL> jarsV = new Vector<URL>(); + if (dir != null) { + // Add dir/classes first, if it exists + URL url = getURL(dir, "classes"); + if (url != null) + jarsV.addElement(url); + addToClassPath(jarsV, dir); + } + + if (cpath != null) + addJarsFromClassPath(jarsV, cpath); + + if (cpathProp != null) { + String cpath1 = System.getProperty(cpathProp); + addJarsFromClassPath(jarsV, cpath1); + } + + if (addTools) + addToolsJar(jarsV); + + return getClassPath(jarsV); + } + + // -------------------- other utils -------------------- + public static void clear() { + objectMethods.clear(); + } + + static Hashtable<Class<?>, Method[]> objectMethods = + new Hashtable<Class<?>, Method[]>(); + + public static Method[] findMethods(Class<?> c) { + Method methods[] = objectMethods.get(c); + if (methods != null) + return methods; + + methods = c.getMethods(); + objectMethods.put(c, methods); + return methods; + } + + public static Method findMethod(Class<?> c, String name, + Class<?> params[]) { + Method methods[] = findMethods(c); + if (methods == null) + return null; + for (int i = 0; i < methods.length; i++) { + if (methods[i].getName().equals(name)) { + Class<?> methodParams[] = methods[i].getParameterTypes(); + if (methodParams == null) + if (params == null || params.length == 0) + return methods[i]; + if (params == null) + if (methodParams == null || methodParams.length == 0) + return methods[i]; + if (params.length != methodParams.length) + continue; + boolean found = true; + for (int j = 0; j < params.length; j++) { + if (params[j] != methodParams[j]) { + found = false; + break; + } + } + if (found) + return methods[i]; + } + } + return null; + } + + /** + * Test if the object implements a particular + * method + */ + public static boolean hasHook(Object obj, String methodN) { + try { + Method myMethods[] = findMethods(obj.getClass()); + for (int i = 0; i < myMethods.length; i++) { + if (methodN.equals(myMethods[i].getName())) { + // check if it's overriden + Class<?> declaring = myMethods[i].getDeclaringClass(); + Class<?> parentOfDeclaring = declaring.getSuperclass(); + // this works only if the base class doesn't extend + // another class. + + // if the method is declared in a top level class + // like BaseInterceptor parent is Object, otherwise + // parent is BaseInterceptor or an intermediate class + if (!"java.lang.Object".equals(parentOfDeclaring.getName())) { + return true; + } + } + } + } + catch (Exception ex) { + ex.printStackTrace(); + } + return false; + } + + public static void callMain(Class<?> c, String args[]) throws Exception { + Class<?> p[] = new Class[1]; + p[0] = args.getClass(); + Method m = c.getMethod("main", p); + m.invoke(c, new Object[] { args }); + } + + public static Object callMethod1(Object target, String methodN, + Object param1, String typeParam1, ClassLoader cl) throws Exception { + if (target == null || param1 == null) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Assert: Illegal params " + + target + " " + param1); + } + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: callMethod1 " + + target.getClass().getName() + " " + + param1.getClass().getName() + " " + typeParam1); + + Class<?> params[] = new Class[1]; + if (typeParam1 == null) + params[0] = param1.getClass(); + else + params[0] = cl.loadClass(typeParam1); + Method m = findMethod(target.getClass(), methodN, params); + if (m == null) + throw new NoSuchMethodException(target.getClass().getName() + " " + + methodN); + return m.invoke(target, new Object[] { param1 }); + } + + public static Object callMethod0(Object target, String methodN) + throws Exception { + if (target == null) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Assert: Illegal params " + + target); + return null; + } + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: callMethod0 " + + target.getClass().getName() + "." + methodN); + + Class<?> params[] = new Class[0]; + Method m = findMethod(target.getClass(), methodN, params); + if (m == null) + throw new NoSuchMethodException(target.getClass().getName() + " " + + methodN); + return m.invoke(target, emptyArray); + } + + static Object[] emptyArray = new Object[] {}; + + public static Object callMethodN(Object target, String methodN, + Object params[], Class<?> typeParams[]) throws Exception { + Method m = null; + m = findMethod(target.getClass(), methodN, typeParams); + if (m == null) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Can't find method " + methodN + + " in " + target + " CLASS " + target.getClass()); + return null; + } + Object o = m.invoke(target, params); + + if (log.isDebugEnabled()) { + // debug + StringBuilder sb = new StringBuilder(); + sb.append("" + target.getClass().getName() + "." + methodN + "( "); + for (int i = 0; i < params.length; i++) { + if (i > 0) + sb.append(", "); + sb.append(params[i]); + } + sb.append(")"); + log.debug("IntrospectionUtils:" + sb.toString()); + } + return o; + } + + public static Object convert(String object, Class<?> paramType) { + Object result = null; + if ("java.lang.String".equals(paramType.getName())) { + result = object; + } + else if ("java.lang.Integer".equals(paramType.getName()) + || "int".equals(paramType.getName())) { + try { + result = new Integer(object); + } + catch (NumberFormatException ex) {} + // Try a setFoo ( boolean ) + } + else if ("java.lang.Boolean".equals(paramType.getName()) + || "boolean".equals(paramType.getName())) { + result = new Boolean(object); + + // Try a setFoo ( InetAddress ) + } + else if ("java.net.InetAddress".equals(paramType + .getName())) { + try { + result = InetAddress.getByName(object); + } + catch (UnknownHostException exc) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Unable to resolve host name:" + + object); + } + + // Unknown type + } + else { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: Unknown type " + + paramType.getName()); + } + if (result == null) { + throw new IllegalArgumentException("Can't convert argument: " + object); + } + return result; + } + + // -------------------- Get property -------------------- + // This provides a layer of abstraction + + public static interface PropertySource { + + public String getProperty(String key); + + } + + public static interface AttributeHolder { + + public void setAttribute(String key, Object o); + + } + +} \ No newline at end of file Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/shell/ClusterMessageCommand.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/shell/ClusterMessageCommand.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,92 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.shell; + +import java.io.PrintStream; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.StringTokenizer; + +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.apache.felix.shell.Command; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; + +public class ClusterMessageCommand implements Command { + + BundleContext m_bundleContext; + + public String getName() { + return "cmesg"; + } + + public String getShortDescription() { + return "Send a message"; + } + + public String getUsage() { + return "cmesg <nodeid> <topic> <message>"; + } + + public void execute(String s, PrintStream out, PrintStream err) { + + String serviceName = ClusterMessageService.class.getName(); + String methodName = "publish"; + Class[] params = new Class[] { String.class, Serializable.class }; + + StringTokenizer st = new StringTokenizer(s, " "); + st.nextToken(); // ignore command itself + + if (st.countTokens() != 3) { + out.println(getUsage()); + return; + } + + String nodeid = st.nextToken(); + String topic = st.nextToken(); + String message = st.nextToken(); + + try { + Object serviceObject = null; + ServiceReference[] refs = m_bundleContext.getServiceReferences( + serviceName, "(" + + ClusterMemberService.CLUSTER_MEMBERID_PROP + "=" + nodeid + ")"); + if (refs != null && refs.length > 0) { + serviceObject = m_bundleContext.getService(refs[0]); + } + if (serviceObject == null) { + out.println("Unable to locate service " + serviceName + " " + "(" + + ClusterMemberService.CLUSTER_MEMBERID_PROP + "=" + nodeid + ")"); + return; + } + + Class serviceClass = serviceObject.getClass(); + Method methodObject = serviceClass.getMethod(methodName, params); + Object result = methodObject.invoke(serviceObject, new Object[] { topic, message }); + if (result != null) { + out.println(result.toString()); + } + + } + catch (Exception e) { + out.println("Something went wrong :S"); + e.printStackTrace(out); + } + } + +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/test/ClusterMessageTester.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/test/ClusterMessageTester.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,49 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.cluster.test; + +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.ClusterTopicListener; + +public class ClusterMessageTester { + + // injected + private volatile ClusterMessageService m_clusterMessageService; + + private ClusterTopicListener m_clusterTopicListener; + + public void start() { + m_clusterTopicListener = new ClusterTopicListener() { + public void recieveMessage(Object message) { + System.err.println("Recieved: " + message); + } + + public String getTopic() { + return "testtopic"; + } + }; + m_clusterMessageService.subscribe(m_clusterTopicListener); + } + + public void stop() { + m_clusterMessageService.unsubscribe(m_clusterTopicListener); + } + + public void send(String message) { + m_clusterMessageService.publish("testtopic", message); + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/osgi/Activator.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,149 @@ +/* +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.osgi; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.UUID; + +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.internal.tribes.ClusterMemberServiceImpl; +import org.amdatu.core.fabric.cluster.internal.tribes.ClusterMessageServiceImpl; +import org.amdatu.core.fabric.cluster.shell.ClusterMessageCommand; +import org.amdatu.core.fabric.remote.DiscoveryService; +import org.amdatu.core.fabric.remote.DistributionService; +import org.amdatu.core.fabric.remote.RemotableServiceEndpoint; +import org.amdatu.core.fabric.remote.RemoteServiceEndPoint; +import org.amdatu.core.fabric.remote.service.DiscoveryServiceImpl; +import org.amdatu.core.fabric.remote.service.DistributionServiceImpl; +import org.amdatu.core.fabric.remote.shell.DiscoveryListCommand; +import org.amdatu.core.fabric.test.FabricTestService; +import org.amdatu.core.fabric.test.RemotableService; +import org.amdatu.core.fabric.test.service.FabricTestServiceImpl; +import org.amdatu.core.fabric.test.shell.FabricTestCommand; +import org.apache.felix.dm.DependencyActivatorBase; +import org.apache.felix.dm.DependencyManager; +import org.apache.felix.shell.Command; +import org.osgi.framework.BundleContext; +import org.osgi.service.log.LogService; + +public class Activator extends DependencyActivatorBase { + + @Override + public void init(BundleContext context, DependencyManager manager) throws Exception { + + // Generic props + + String memberid = UUID.randomUUID().toString(); + Dictionary<String, Object> cm1props = new Hashtable<String, Object>(); + cm1props.put(ClusterMemberService.CLUSTER_CLUSTERID_PROP, "CLUSTER1"); + cm1props.put(ClusterMemberService.CLUSTER_MEMBERID_PROP, memberid); + cm1props.put(ClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, new String[] { "-port", "8880" }); + + // ClusterMemberService + + manager.add( + createComponent() + .setImplementation(new ClusterMemberServiceImpl("CLUSTER1", memberid, cm1props)) + .setInterface(ClusterMemberService.class.getName(), cm1props) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + // ClusterMessageService + + manager.add( + createComponent() + .setImplementation(new ClusterMessageServiceImpl()) + .setInterface(ClusterMessageService.class.getName(), cm1props) + .add( + createServiceDependency().setService(ClusterMemberService.class).setRequired(true)) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + // DiscoveryService + + manager.add( + createComponent() + .setImplementation(new DiscoveryServiceImpl()) + .setInterface(DiscoveryService.class.getName(), cm1props) + .add( + createServiceDependency().setService(ClusterMessageService.class).setRequired(true)) + .add( + createServiceDependency().setService(RemotableServiceEndpoint.class) + .setCallbacks("remotableServiceEndPointAdded", "remotableServiceEndPointRemoved") + .setRequired(false)) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + // DistributionService + + manager.add( + createComponent() + .setImplementation(new DistributionServiceImpl()) + .setInterface(DistributionService.class.getName(), cm1props) + .add( + createServiceDependency().setService(ClusterMemberService.class).setRequired(true)) + .add( + createServiceDependency().setService(ClusterMessageService.class).setRequired(true)) + .add( + createServiceDependency().setService("(" + DistributionService.REMOTABLE_PROP + "=true)") + .setCallbacks("localRemotableServiceAdded", "localRemotableServiceRemoved") + .setRequired(false)) + .add( + createServiceDependency().setService(RemoteServiceEndPoint.class) + .setCallbacks("remoteServiceEndPointAdded", "remoteServiceEndPointRemoved") + .setRequired(false)) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + // Test & Shell + + manager.add( + createComponent() + .setImplementation(new FabricTestServiceImpl()) + .setInterface(FabricTestService.class.getName(), null) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + manager.add( + createComponent() + .setImplementation(new FabricTestCommand()) + .setInterface(Command.class.getName(), null) + .add( + createServiceDependency().setService(FabricTestService.class).setRequired(true)) + .add( + createServiceDependency().setService(RemotableService.class).setRequired(false) + .setCallbacks("remotableServiceAdded", "remotableServiceRemoved")) + .add( + createServiceDependency().setService(LogService.class).setRequired(false) + )); + + manager.add( + createComponent() + .setImplementation(new ClusterMessageCommand()) + .setInterface(Command.class.getName(), null) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + manager.add( + createComponent() + .setImplementation(new DiscoveryListCommand()) + .setInterface(Command.class.getName(), null) + .add(createServiceDependency().setService(LogService.class).setRequired(false))); + + } + + @Override + public void destroy(BundleContext context, DependencyManager manager) throws Exception { + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DiscoveryService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,28 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote; + +public interface DiscoveryService { + + public static final String DISCOVERY_TOPIC = "org.amdatu.fabric.remote.DISCOVERY"; + final static String REMOTABLE_PROP = "org.amdatu.fabric.remote.REMOTABLE"; + final static String REMOTE_PROP = "org.amdatu.fabric.remote.REMOTE"; + + ServiceEndPoint[] getLocalServiceEndPoints(); + + ServiceEndPoint[] getRemoteServiceEndPoints(); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/DistributionService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,25 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote; + +public interface DistributionService { + + static final String REMOTE_TOPIC = "org.amdatu.fabric.remote.REMOTE"; + final static String REMOTABLE_PROP = "org.amdatu.fabric.remote.REMOTABLE"; + final static String REMOTE_PROP = "org.amdatu.fabric.remote.REMOTE"; + +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemotableServiceEndpoint.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemotableServiceEndpoint.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,22 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote; + +public interface RemotableServiceEndpoint { + + ServiceEndPoint getServiceEndPoint(); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/RemoteServiceEndPoint.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,23 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote; + +public interface RemoteServiceEndPoint { + + ServiceEndPoint getServiceEndPoint(); + +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,122 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote; + +import java.io.Serializable; +import java.util.Enumeration; +import java.util.Hashtable; + +public class ServiceEndPoint implements Serializable { + + private static final long serialVersionUID = 7613755184494120032L; + + private String m_clusterId; + private String m_memberId; + private String[] m_objectClass; + private long m_originalServiceId; + private long m_localServiceId; + private Hashtable<String, Object> m_properties; + + public ServiceEndPoint() { + } + + public String getClusterId() { + return m_clusterId; + } + + public void setClusterId(String clusterId) { + m_clusterId = clusterId; + } + + public String getMemberId() { + return m_memberId; + } + + public void setMemberId(String memberId) { + m_memberId = memberId; + } + + public String[] getObjectClass() { + return m_objectClass; + } + + public void setObjectClass(String[] objectClass) { + m_objectClass = objectClass; + } + + public long getOriginalServiceId() { + return m_originalServiceId; + } + + public void setOriginalServiceId(long originalServiceId) { + m_originalServiceId = originalServiceId; + } + + public long getLocalServiceId() { + return m_localServiceId; + } + + public void setLocalServiceId(long localServiceId) { + m_localServiceId = localServiceId; + } + + public Hashtable<String, Object> getProperties() { + return m_properties; + } + + public void setProperties(Hashtable<String, Object> properties) { + m_properties = properties; + } + + @Override + public boolean equals(Object obj) { + ServiceEndPoint other = (ServiceEndPoint) obj; + if (!getClusterId().equals(other.getClusterId())) + return false; + if (!getMemberId().equals(other.getMemberId())) + return false; + if (getOriginalServiceId() != other.getOriginalServiceId()) + return false; + return true; + } + + @Override + public int hashCode() { + return (getClusterId() + getMemberId() + getOriginalServiceId()).hashCode(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ServiceEndPoint["); + sb.append("clusterid=" + getClusterId()); + sb.append("; memberid=" + getMemberId()); + sb.append("; objectClass={"); + for (String part : getObjectClass()) { + sb.append(" " + part); + } + sb.append(" }; properties={"); + Enumeration<String> enumeration = getProperties().keys(); + while (enumeration.hasMoreElements()) { + String key = (String) enumeration.nextElement(); + sb.append(" " + key + ":" + getProperties().get(key).toString()); + } + sb.append("}]"); + return sb.toString(); + } + +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,34 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.io.Serializable; + +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +public class EndpointDepublishMessage implements Serializable { + + private ServiceEndPoint m_serviceEndPoint; + + public EndpointDepublishMessage(ServiceEndPoint serviceEndPoint) { + m_serviceEndPoint = serviceEndPoint; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,25 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.io.Serializable; + +public class EndpointDiscoveryMessage implements Serializable { + + public EndpointDiscoveryMessage() { + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,41 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.io.Serializable; +import java.util.Map; + +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +public class EndpointInvokeMessage implements Serializable { + + private ServiceEndPoint m_serviceEndPoint; + private Map<String, Object> m_payload; + + public EndpointInvokeMessage(ServiceEndPoint serviceEndPoint, Map<String, Object> payload) { + m_serviceEndPoint = serviceEndPoint; + m_payload = payload; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } + + public Map<String, Object> getPayload() { + return m_payload; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,34 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.io.Serializable; + +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +public class EndpointPublishMessage implements Serializable { + + private ServiceEndPoint m_serviceEndPoint; + + public EndpointPublishMessage(ServiceEndPoint serviceEndPoint) { + m_serviceEndPoint = serviceEndPoint; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,41 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.io.Serializable; +import java.util.Map; + +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +public class EndpointResponseMessage implements Serializable { + + private ServiceEndPoint m_serviceEndPoint; + private Map<String, Object> m_payload; + + public EndpointResponseMessage(ServiceEndPoint serviceEndPoint, Map<String, Object> payload) { + m_serviceEndPoint = serviceEndPoint; + m_payload = payload; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } + + public Map<String, Object> getPayload() { + return m_payload; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,201 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.internal; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.ClusterTopicListener; +import org.amdatu.core.fabric.remote.DistributionService; +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +/** + * I am a delegate to the DistributionService + * I proxy local service invocations and put them over the cluster + */ +public class LocalServiceInvocationHandler implements InvocationHandler, ClusterTopicListener { + + private final static String MESSAGE_INVOCATION_ID_KEY = "MII"; + private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY = "MIEI"; + private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK"; + private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK"; + private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM"; + private final static long INVOCATION_TIMEOUT = 10000; + + private final Set<String> m_invocationIdentifiers = new HashSet<String>(); + private final Map<String, Object> myInvocationResponses = new HashMap<String, Object>(); + + private volatile ClusterMessageService m_clusterMessageService; + + private final ServiceEndPoint m_serviceEndpoint; + private final Class<?>[] m_interfaceClasses; + + private String m_serviceEndpointTopic; + + public LocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint, Class<?>[] interfaceClasses) { + m_serviceEndpoint = serviceEndpoint; + m_interfaceClasses = interfaceClasses; + m_serviceEndpointTopic = DistributionService.REMOTE_TOPIC; + } + + /* + * lifecycle + */ + + public void start() { + m_clusterMessageService.subscribe(this); + } + + public void stop() { + m_clusterMessageService.unsubscribe(this); + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndpoint; + } + + /* + * InvocationHandler interface + */ + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (isServiceIntefaceInvocation(method)) { + String invocationIdentifier = createNewInvocationIdentifier(); + Map<String, Object> payload = getInvocationPayload(invocationIdentifier, method, args); + m_clusterMessageService.publish(m_serviceEndpointTopic, new EndpointInvokeMessage(m_serviceEndpoint, + payload)); + Object response = retrieveInvocationResponse(invocationIdentifier); + return response; + } + return method.invoke(this, args); + } + + /* + * ClusterTopicListener interface + */ + public String getTopic() { + return m_serviceEndpointTopic; + } + + public void recieveMessage(Object message) { + if (message instanceof EndpointResponseMessage) { + EndpointResponseMessage endpointResponseMessage = (EndpointResponseMessage) message; + Map<String, Object> payload = endpointResponseMessage.getPayload(); + String invocationId = (String) payload.get(MESSAGE_INVOCATION_ID_KEY); + if (ownsInvocationIndentifier(invocationId)) { + Object response = payload.get(MESSAGE_RESPONSE_MAP_KEY); + storeResponseObject(invocationId, response); + removeInvocationIdentifier(invocationId); + System.err.println("Stored response: " + response); + } + } + } + + /* + * private + */ + + private String createNewInvocationIdentifier() { + String invocationId = UUID.randomUUID().toString(); + synchronized (m_invocationIdentifiers) { + m_invocationIdentifiers.add(invocationId); + } + return invocationId; + } + + private Map<String, Object> getInvocationPayload(String invocationId, Method method, Object[] args) { + Map<String, Object> payload = new HashMap<String, Object>(); + payload.put(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY, m_serviceEndpoint); + payload.put(MESSAGE_INVOCATION_ID_KEY, invocationId); + payload.put(MESSAGE_INVOCATION_METHODNAME_KEY, method.getName()); + payload.put(MESSAGE_INVOCATION_ARGUMENTS_KEY, args); + return payload; + } + + private Object retrieveInvocationResponse(String invocationId) { + long invocationStart = System.currentTimeMillis(); + boolean isResponseRecieved = false; + boolean isResponseTimedOut = false; + Object response = null; + while (!isResponseRecieved && !isResponseTimedOut) { + if (invocationResponseRecieved(invocationId)) { + response = retrieveInvocationResponseObject(invocationId); + isResponseRecieved = true; + System.err.println("Recieved repsonse for invocation response: " + invocationId); + } + else { + isResponseTimedOut = isTimedOut(invocationStart); + } + } + if (isResponseTimedOut) { + System.err.println("Timed out waiting for invocation response: " + invocationId); + } + System.err.println("Invocation took " + (System.currentTimeMillis() - invocationStart) + " ms"); + return response; + } + + private boolean isTimedOut(final long invocationStart) { + return (System.currentTimeMillis() - invocationStart) > INVOCATION_TIMEOUT; + } + + private boolean invocationResponseRecieved(String invocationId) { + synchronized (myInvocationResponses) { + return myInvocationResponses.containsKey(invocationId); + } + } + + private Object retrieveInvocationResponseObject(final String invocationId) { + synchronized (myInvocationResponses) { + return myInvocationResponses.remove(invocationId); + } + } + + private boolean ownsInvocationIndentifier(final String invocationId) { + synchronized (m_invocationIdentifiers) { + return m_invocationIdentifiers.contains(invocationId); + } + } + + private void storeResponseObject(final String invocationId, final Object payload) { + synchronized (myInvocationResponses) { + myInvocationResponses.put(invocationId, payload); + } + } + + private void removeInvocationIdentifier(final String invocationId) { + synchronized (m_invocationIdentifiers) { + m_invocationIdentifiers.remove(invocationId); + } + } + + private boolean isServiceIntefaceInvocation(Method method) { + for (Class<?> serviceInterfaceClass : m_interfaceClasses) { + for (Method serviceMethod : serviceInterfaceClass.getMethods()) { + if (method.equals(serviceMethod)) { + return true; + } + } + } + return false; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,174 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.service; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.ClusterTopicListener; +import org.amdatu.core.fabric.remote.DiscoveryService; +import org.amdatu.core.fabric.remote.RemotableServiceEndpoint; +import org.amdatu.core.fabric.remote.RemoteServiceEndPoint; +import org.amdatu.core.fabric.remote.ServiceEndPoint; +import org.amdatu.core.fabric.remote.internal.EndpointDepublishMessage; +import org.amdatu.core.fabric.remote.internal.EndpointDiscoveryMessage; +import org.amdatu.core.fabric.remote.internal.EndpointPublishMessage; +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; +import org.osgi.framework.ServiceReference; +import org.osgi.service.log.LogService; + +/** + * I keep track of local RemotableServiceEndpoint services and publish them in the cluster + * I listen to the cluster and publish local RemoteServiceEndpoint services for them + * TODO support LOOKUP requests for more fine grained discovery + */ +public class DiscoveryServiceImpl implements DiscoveryService, ClusterTopicListener { + + private final Set<ServiceEndPoint> m_remotableEndPoints = new HashSet<ServiceEndPoint>(); + private final Map<ServiceEndPoint, Component> m_remoteEndPointComponents = + new HashMap<ServiceEndPoint, Component>(); + + private volatile ClusterMessageService m_clusterMessageService; + private volatile DependencyManager m_dependencyManager; + private volatile Component m_component; + private volatile LogService m_logService; + + /******************************************************** + * Constructors + ********************************************************/ + + public DiscoveryServiceImpl() { + + } + + /******************************************************** + * Life cycle + ********************************************************/ + + public synchronized void start() { + m_clusterMessageService.subscribe(this); + m_clusterMessageService.publish(DISCOVERY_TOPIC, new EndpointDiscoveryMessage()); + } + + public synchronized void stop() { + m_clusterMessageService.unsubscribe(this); + } + + /******************************************************** + * Callbacks + ********************************************************/ + + public void remotableServiceEndPointAdded( + ServiceReference serviceReference, Object remotableServiceEndpointObject) { + RemotableServiceEndpoint remotableServiceEndpoint = (RemotableServiceEndpoint) remotableServiceEndpointObject; + ServiceEndPoint serviceEndPoint = remotableServiceEndpoint.getServiceEndPoint(); + synchronized (m_remotableEndPoints) { + if (!m_remotableEndPoints.contains(serviceEndPoint)) { + m_remotableEndPoints.add(serviceEndPoint); + m_clusterMessageService.publish(DISCOVERY_TOPIC, new EndpointPublishMessage(serviceEndPoint)); + } + else { + throw new IllegalStateException("Unexpected state... needs analysis"); + } + + } + } + + public void remotableServiceEndPointRemoved( + ServiceReference serviceReference, Object remotableServiceEndpointObject) { + RemotableServiceEndpoint remotableServiceEndpoint = (RemotableServiceEndpoint) remotableServiceEndpointObject; + ServiceEndPoint serviceEndPoint = remotableServiceEndpoint.getServiceEndPoint(); + synchronized (m_remotableEndPoints) { + if (m_remotableEndPoints.contains(serviceEndPoint)) { + m_clusterMessageService.publish(DISCOVERY_TOPIC, new EndpointDepublishMessage(serviceEndPoint)); + m_remotableEndPoints.remove(serviceEndPoint); + } + else { + throw new IllegalStateException("Unexpected state... needs analysis"); + } + } + } + + /******************************************************** + * DiscoveryService + ********************************************************/ + + public ServiceEndPoint[] getLocalServiceEndPoints() { + synchronized (m_remotableEndPoints) { + return m_remotableEndPoints.toArray(new ServiceEndPoint[m_remotableEndPoints.size()]); + } + } + + public ServiceEndPoint[] getRemoteServiceEndPoints() { + synchronized (m_remoteEndPointComponents) { + return m_remoteEndPointComponents.keySet().toArray( + new ServiceEndPoint[m_remoteEndPointComponents.keySet().size()]); + } + } + + /******************************************************** + * ClusterTopicListner + ********************************************************/ + + // TODO handle LOOKUP + public void recieveMessage(Object message) { + if (message instanceof EndpointPublishMessage) { + EndpointPublishMessage endpointPublishMessage = (EndpointPublishMessage) message; + ServiceEndPoint serviceEndPoint = endpointPublishMessage.getServiceEndPoint(); + synchronized (m_remoteEndPointComponents) { + if (m_remoteEndPointComponents.containsKey(serviceEndPoint)) + return; + Component serviceComponent = + m_dependencyManager.createComponent().setInterface(RemoteServiceEndPoint.class.getName(), null) + .setImplementation(new RemoteServiceEndPointImpl(serviceEndPoint)); + // FIXME validate that this depdendency works + serviceComponent.add(m_dependencyManager.createServiceDependency().setService( + DiscoveryService.class, m_component.getServiceRegistration().getReference())); + m_dependencyManager.add(serviceComponent); + m_remoteEndPointComponents.put(serviceEndPoint, serviceComponent); + } + return; + } + if (message instanceof EndpointDepublishMessage) { + EndpointDepublishMessage endpointDepublishMessage = (EndpointDepublishMessage) message; + ServiceEndPoint serviceEndPoint = endpointDepublishMessage.getServiceEndPoint(); + synchronized (m_remoteEndPointComponents) { + if (!m_remoteEndPointComponents.containsKey(serviceEndPoint)) + return; + Component serviceComponent = m_remoteEndPointComponents.remove(serviceEndPoint); + m_dependencyManager.remove(serviceComponent); + } + return; + } + if (message instanceof EndpointDiscoveryMessage) { + synchronized (m_remotableEndPoints) { + for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) { + m_clusterMessageService.publish(DISCOVERY_TOPIC, new EndpointPublishMessage(serviceEndPoint)); + } + } + return; + } + } + + public String getTopic() { + return DISCOVERY_TOPIC; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,303 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.service; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; + +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.cluster.ClusterMessageService; +import org.amdatu.core.fabric.cluster.ClusterTopicListener; +import org.amdatu.core.fabric.remote.DiscoveryService; +import org.amdatu.core.fabric.remote.DistributionService; +import org.amdatu.core.fabric.remote.RemotableServiceEndpoint; +import org.amdatu.core.fabric.remote.RemoteServiceEndPoint; +import org.amdatu.core.fabric.remote.ServiceEndPoint; +import org.amdatu.core.fabric.remote.internal.EndpointInvokeMessage; +import org.amdatu.core.fabric.remote.internal.EndpointResponseMessage; +import org.amdatu.core.fabric.remote.internal.LocalServiceInvocationHandler; +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.service.log.LogService; + +public class DistributionServiceImpl implements DistributionService, ClusterTopicListener { + + private final static String MESSAGE_INVOCATION_ID_KEY = "MII"; + private final static String MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY = "MIEI"; + private final static String MESSAGE_INVOCATION_METHODNAME_KEY = "MIMK"; + private final static String MESSAGE_INVOCATION_ARGUMENTS_KEY = "MIAK"; + private final static String MESSAGE_RESPONSE_MAP_KEY = "MRM"; + + private final Map<ServiceEndPoint, ServiceReferenceComponentTuple> m_serviceEndPointServiceReferenceComponents = + new HashMap<ServiceEndPoint, ServiceReferenceComponentTuple>(); + + private final Map<ServiceEndPoint, Component> m_serviceEndPointComponents = + new HashMap<ServiceEndPoint, Component>(); + + private volatile ClusterMemberService m_clusterMemberService; + private volatile ClusterMessageService m_clusterMessageService; + private volatile DependencyManager m_dependencyManager; + private volatile BundleContext m_bundleContext; + private volatile Component m_component; + private volatile LogService m_logService; + + /******************************************************** + * Constructors + ********************************************************/ + + public DistributionServiceImpl() { + } + + /******************************************************** + * Service life cycle methods + ********************************************************/ + + public synchronized void start() { + m_clusterMessageService.subscribe(this); + } + + public synchronized void stop() { + m_clusterMessageService.unsubscribe(this); + } + + /******************************************************** + * Dependency Callback methods + ********************************************************/ + + public void localRemotableServiceAdded(ServiceReference serviceReference /* , Object Service */) { + ServiceEndPoint serviceEndPoint = serviceEndPointFromServiceReference(serviceReference); + synchronized (m_serviceEndPointServiceReferenceComponents) { + if (!m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) { + Component serviceComponent = + m_dependencyManager.createComponent().setInterface(RemotableServiceEndpoint.class.getName(), null) + .setImplementation(new RemotableServiceEndPointImpl(serviceEndPoint)); + // FIXME validate that this depdendency works + serviceComponent.add(m_dependencyManager.createServiceDependency().setService( + DistributionService.class, m_component.getServiceRegistration().getReference())); + m_dependencyManager.add(serviceComponent); + m_serviceEndPointServiceReferenceComponents.put(serviceEndPoint, new ServiceReferenceComponentTuple( + serviceReference, serviceComponent)); + } + } + } + + public void localRemotableServiceRemoved(ServiceReference serviceReference /* , Object Service */) { + ServiceEndPoint serviceEndPoint = serviceEndPointFromServiceReference(serviceReference); + synchronized (m_serviceEndPointServiceReferenceComponents) { + if (m_serviceEndPointServiceReferenceComponents.containsKey(serviceEndPoint)) { + ServiceReferenceComponentTuple serviceReferenceComponentTuple = + m_serviceEndPointServiceReferenceComponents.remove(serviceEndPoint); + m_dependencyManager.remove(serviceReferenceComponentTuple.getComponent()); + } + else { + throw new IllegalStateException("Unexpected state... this needs analysis"); + } + } + } + + public void remoteServiceEndPointAdded(/* ServiceReference serviceReference, */Object remoteServiceEndPointObject) { + RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint) remoteServiceEndPointObject; + ServiceEndPoint serviceEndPoint = remoteServiceEndPoint.getServiceEndPoint(); + Object localServiceProxy = createLocalServiceInvocationHandler(serviceEndPoint); + if (localServiceProxy != null) { + Component localServiceComponent = createLocalServiceComponent(serviceEndPoint, localServiceProxy); + synchronized (m_serviceEndPointComponents) { + if (!m_serviceEndPointComponents.containsKey(serviceEndPoint)) { + m_serviceEndPointComponents.put(serviceEndPoint, localServiceComponent); + m_dependencyManager.add(localServiceComponent); + } + else { + throw new IllegalStateException("Unexpected state... this needs analysis"); + } + } + } + } + + public void remoteServiceEndPointRemoved(/* ServiceReference serviceReference, */Object remoteServiceEndPointObject) { + RemoteServiceEndPoint remoteServiceEndPoint = (RemoteServiceEndPoint) remoteServiceEndPointObject; + ServiceEndPoint serviceEndPoint = remoteServiceEndPoint.getServiceEndPoint(); + synchronized (m_serviceEndPointComponents) { + if (m_serviceEndPointComponents.containsKey(serviceEndPoint)) { + Component localServiceComponent = m_serviceEndPointComponents.get(serviceEndPoint); + m_dependencyManager.remove(localServiceComponent); + } + else { + throw new IllegalStateException("Unexpected state... this needs analysis"); + } + } + } + + /******************************************************** + * ClusterTopicListener + ********************************************************/ + + public String getTopic() { + return DistributionService.REMOTE_TOPIC; + } + + public void recieveMessage(Object message) { + if (message instanceof EndpointInvokeMessage) { + + EndpointInvokeMessage endpointInvokeMessage = (EndpointInvokeMessage) message; + Map<String, Object> payload = endpointInvokeMessage.getPayload(); + String invocationId = (String) payload.get(MESSAGE_INVOCATION_ID_KEY); + ServiceEndPoint serviceEndpoint = (ServiceEndPoint) payload + .get(MESSAGE_INVOCATION_SERVICEENDPOINTINFO_KEY); + + String methodName = (String) payload.get(MESSAGE_INVOCATION_METHODNAME_KEY); + Object[] args = (Object[]) payload.get(MESSAGE_INVOCATION_ARGUMENTS_KEY); + + Class<?>[] types; + if (args == null) { + types = new Class[0]; + } + else { + types = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + types[i] = args[i].getClass(); + } + } + + synchronized (m_serviceEndPointServiceReferenceComponents) { + ServiceReferenceComponentTuple serviceReferenceComponentTuple = + m_serviceEndPointServiceReferenceComponents.get(serviceEndpoint); + if (serviceReferenceComponentTuple == null) { + // TODO local service gone.. what to do? Send back error to prevent client from waiting... + return; + } + ServiceReference serviceReference = serviceReferenceComponentTuple.getServiceReference(); + try { + Object serviceObject = m_bundleContext.getService(serviceReference); + if (serviceObject == null) { + // TODO local service gone.. what to do? Send back error to prevent client from waiting... + return; + } + Method serviceMethod; + serviceMethod = serviceObject.getClass().getMethod(methodName, types); + Object serviceResponse; + serviceResponse = serviceMethod.invoke(serviceObject, args); + + m_bundleContext.ungetService(serviceReference); + + Map<String, Object> responsePayload = new HashMap<String, Object>(); + responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId); + responsePayload.put(MESSAGE_RESPONSE_MAP_KEY, serviceResponse); + + m_clusterMessageService + .publish(REMOTE_TOPIC, new EndpointResponseMessage(serviceEndpoint, responsePayload)); + + } + catch (SecurityException e) { + // TODO its fooked.. what to do? Send back error to prevent client from waiting... + e.printStackTrace(); + } + catch (NoSuchMethodException e) { + // TODO its fooked.. what to do? Send back error to prevent client from waiting... + e.printStackTrace(); + } + catch (IllegalArgumentException e) { + // TODO its fooked.. what to do? Send back error to prevent client from waiting... + e.printStackTrace(); + } + catch (IllegalAccessException e) { + // TODO its fooked.. what to do? Send back error to prevent client from waiting... + e.printStackTrace(); + } + catch (InvocationTargetException e) { + // TODO its fooked.. what to do? Send back error to prevent client from waiting... + e.printStackTrace(); + } + } + } + } + + /* + * private + */ + + private Object createLocalServiceInvocationHandler(ServiceEndPoint serviceEndpoint) { + Class<?>[] interfaceClasses = new Class<?>[serviceEndpoint.getObjectClass().length]; + for (int i = 0; i < serviceEndpoint.getObjectClass().length; i++) { + String interfaceName = serviceEndpoint.getObjectClass()[i]; + try { + interfaceClasses[i] = m_bundleContext.getBundle().loadClass(interfaceName); + } + catch (ClassNotFoundException e) { + System.err.println("Unable to load interface classes for endpoint: " + serviceEndpoint.toString()); + return null; + } + } + LocalServiceInvocationHandler localServiceInvocationHandler = + new LocalServiceInvocationHandler(serviceEndpoint, interfaceClasses); + Object serviceObject = Proxy.newProxyInstance(interfaceClasses[0].getClassLoader(), + interfaceClasses, localServiceInvocationHandler); + return serviceObject; + } + + private Component createLocalServiceComponent(ServiceEndPoint serviceEndPoint, Object serviceObject) { + Hashtable<String, Object> registrationProperties = serviceEndPoint.getProperties(); + registrationProperties.remove(DiscoveryService.REMOTABLE_PROP); + registrationProperties.put(DiscoveryService.REMOTE_PROP, "true"); + registrationProperties.put("amdatu_endpoint", serviceEndPoint.toString()); + + Component component = m_dependencyManager.createComponent() + .setInterface(serviceEndPoint.getObjectClass(), registrationProperties) + .setImplementation(serviceObject); + + component.add(m_dependencyManager.createServiceDependency().setService(ClusterMessageService.class) + .setRequired(true)); + return component; + } + + private ServiceEndPoint serviceEndPointFromServiceReference(ServiceReference serviceReference) { + ServiceEndPoint serviceEndPoint = new ServiceEndPoint(); + serviceEndPoint.setClusterId(m_clusterMemberService.getClusterId()); + serviceEndPoint.setMemberId(m_clusterMemberService.getMemberId()); + serviceEndPoint.setObjectClass((String[]) serviceReference.getProperty("objectClass")); + serviceEndPoint.setOriginalServiceId((Long) serviceReference.getProperty("service.id")); + Hashtable<String, Object> properties = new Hashtable<String, Object>(); + for (String key : serviceReference.getPropertyKeys()) { + properties.put(key, serviceReference.getProperty(key)); + } + serviceEndPoint.setProperties(properties); + return serviceEndPoint; + } +} + +class ServiceReferenceComponentTuple { + private final ServiceReference m_serviceReference; + private final Component m_component; + + public ServiceReferenceComponentTuple(ServiceReference serviceReference, Component component) { + m_serviceReference = serviceReference; + m_component = component; + } + + public ServiceReference getServiceReference() { + return m_serviceReference; + } + + public Component getComponent() { + return m_component; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemotableServiceEndPointImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemotableServiceEndPointImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,36 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.service; + +import org.amdatu.core.fabric.remote.RemotableServiceEndpoint; +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +/** + * I get published by the DistributionService for every remotable local endpoint + */ +public class RemotableServiceEndPointImpl implements RemotableServiceEndpoint { + + private final ServiceEndPoint m_serviceEndPoint; + + public RemotableServiceEndPointImpl(ServiceEndPoint serviceEndPoint) { + m_serviceEndPoint = serviceEndPoint; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemoteServiceEndPointImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/RemoteServiceEndPointImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,36 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.service; + +import org.amdatu.core.fabric.remote.RemoteServiceEndPoint; +import org.amdatu.core.fabric.remote.ServiceEndPoint; + +/** + * I get published by the DiscoveryService for every remote endpoint + */ +public class RemoteServiceEndPointImpl implements RemoteServiceEndPoint { + + private final ServiceEndPoint m_serviceEndPoint; + + public RemoteServiceEndPointImpl(ServiceEndPoint serviceEndPoint) { + m_serviceEndPoint = serviceEndPoint; + } + + public ServiceEndPoint getServiceEndPoint() { + return m_serviceEndPoint; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/shell/DiscoveryListCommand.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/shell/DiscoveryListCommand.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,102 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.remote.shell; + +import java.io.PrintStream; +import java.util.StringTokenizer; + +import org.amdatu.core.fabric.cluster.ClusterMemberService; +import org.amdatu.core.fabric.remote.DiscoveryService; +import org.amdatu.core.fabric.remote.ServiceEndPoint; +import org.apache.felix.shell.Command; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; + +public class DiscoveryListCommand implements Command { + + BundleContext m_bundleContext; + + public String getName() { + return "dlist"; + } + + public String getShortDescription() { + return "List discovery state"; + } + + public String getUsage() { + return "dlist <memberid> <local|remote>"; + } + + public void execute(String s, PrintStream out, PrintStream err) { + + String serviceName = DiscoveryService.class.getName(); + + StringTokenizer st = new StringTokenizer(s, " "); + st.nextToken(); // ignore command itself + + if (st.countTokens() != 2) { + out.println(getUsage()); + return; + } + + String memberid = st.nextToken(); + String which = st.nextToken(); + + String methodName = "getLocalServiceEndPoints"; + if (which.equals("remote")) + methodName = "getRemoteServiceEndPoints"; + try { + Object serviceObject = null; + ServiceReference[] refs = m_bundleContext.getServiceReferences( + serviceName, "(" + + ClusterMemberService.CLUSTER_MEMBERID_PROP + "=" + memberid + ")"); + if (refs != null && refs.length > 0) { + serviceObject = m_bundleContext.getService(refs[0]); + } + if (serviceObject == null) { + out.println("Unable to locate service " + serviceName + " " + "(" + + ClusterMemberService.CLUSTER_MEMBERID_PROP + "=" + memberid + ")"); + return; + } + + DiscoveryService discoveryService = (DiscoveryService) serviceObject; + ServiceEndPoint[] serviceEndPoints = null; + if (which.equals("local")) { + serviceEndPoints = discoveryService.getLocalServiceEndPoints(); + out.println("Local service endpoints:"); + } + else { + if (which.equals("remote")) { + serviceEndPoints = discoveryService.getRemoteServiceEndPoints(); + out.println("Remote service endpoints:"); + } + else { + out.print(getUsage()); + return; + } + } + for (ServiceEndPoint sep : serviceEndPoints) { + out.println(sep.toString()); + } + } + catch (Exception e) { + out.println("Something went wrong :S"); + e.printStackTrace(out); + } + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/FabricTestService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/FabricTestService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,25 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.test; + +public interface FabricTestService { + + void registerRemotableService(); + + void unregisterRemotableService(); + +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/RemotableService.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/RemotableService.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,22 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.test; + +public interface RemotableService { + + String hello(String name); +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/RemotableServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/internal/RemotableServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,27 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.test.internal; + +import org.amdatu.core.fabric.test.RemotableService; + +public class RemotableServiceImpl implements RemotableService { + + public String hello(String name) { + System.err.println("Saying hi to " + name); + return "Hi " + name; + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/FabricTestServiceImpl.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/service/FabricTestServiceImpl.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,53 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.test.service; + +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; + +import org.amdatu.core.fabric.remote.DistributionService; +import org.amdatu.core.fabric.test.RemotableService; +import org.amdatu.core.fabric.test.FabricTestService; +import org.amdatu.core.fabric.test.internal.RemotableServiceImpl; +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; + +public class FabricTestServiceImpl implements FabricTestService { + + private final Map<String, Component> m_serviceComponents = new HashMap<String, Component>(); + private volatile DependencyManager m_dependencyManager; + + public void stop() { + if (m_serviceComponents.containsKey("c1")) + unregisterRemotableService(); + } + + public void registerRemotableService() { + Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(DistributionService.REMOTABLE_PROP, "true"); + Component serviceComponent = m_dependencyManager.createComponent() + .setInterface(RemotableService.class.getName(), props).setImplementation(new RemotableServiceImpl()); + m_dependencyManager.add(serviceComponent); + m_serviceComponents.put("c1", serviceComponent); + } + + public void unregisterRemotableService() { + m_dependencyManager.remove(m_serviceComponents.remove("c1")); + } +} Added: sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/FabricTestCommand.java ============================================================================== --- (empty file) +++ sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/test/shell/FabricTestCommand.java Fri Dec 17 16:41:34 2010 @@ -0,0 +1,93 @@ +/* + Copyright (C) 2010 Amdatu.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package org.amdatu.core.fabric.test.shell; + +import java.io.PrintStream; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.StringTokenizer; + +import org.amdatu.core.fabric.test.FabricTestService; +import org.amdatu.core.fabric.test.RemotableService; +import org.apache.felix.shell.Command; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; + +public class FabricTestCommand implements Command { + + private volatile FabricTestService m_fabricTestService; + private volatile List<RemotableService> m_remotableServices = new LinkedList<RemotableService>(); + + BundleContext m_bundleContext; + + public void remotableServiceAdded(ServiceReference serviceReference, Object serviceObject) { + m_remotableServices.add((RemotableService) serviceObject); + } + + public void remotableServiceRemoved(ServiceReference serviceReference, Object serviceObject) { + m_remotableServices.remove((RemotableService) serviceObject); + } + + public String getName() { + return "ftest"; + } + + public String getShortDescription() { + return "Fabric test"; + } + + public String getUsage() { + return "ftest <register|unregister|call>"; + } + + public void execute(String s, PrintStream out, PrintStream err) { + + String serviceName = FabricTestService.class.getName(); + + StringTokenizer st = new StringTokenizer(s, " "); + st.nextToken(); // ignore command itself + + if (st.countTokens() != 1) { + out.println(getUsage()); + return; + } + + String command = st.nextToken(); + + if (command.equals("register")) { + m_fabricTestService.registerRemotableService(); + return; + } + + if (command.equals("unregister")) { + m_fabricTestService.unregisterRemotableService(); + return; + } + + if (command.equals("hello")) { + Random r = new Random(); + String[] names = new String[] { "Bram", "Ivo", "Mark", "Marcel", "Angelo", "Martijn", "Hans" }; + for (String name : names) { + int i = r.nextInt(m_remotableServices.size()); + String anwser = m_remotableServices.get(i).hello(name); + out.println("got: " + anwser); + } + return; + } + } +}
