[DOSGI-230] Add tcp provider
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/80b4dc28 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/80b4dc28 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/80b4dc28 Branch: refs/heads/master Commit: 80b4dc28f45ac01c12989db1ac8e063c6a2027a1 Parents: 610ef4a Author: Christian Schneider <[email protected]> Authored: Tue Mar 8 17:02:03 2016 +0100 Committer: Christian Schneider <[email protected]> Committed: Tue Mar 8 17:02:03 2016 +0100 ---------------------------------------------------------------------- .../features/src/main/resources/features.xml | 18 ++- distribution/multi-bundle/pom.xml | 2 + dsw/cxf-dosgi-tcp/pom.xml | 72 ++++++++++++ .../aries/rsa/provider/tcp/Activator.java | 42 +++++++ .../provider/tcp/LoaderObjectInputStream.java | 43 +++++++ .../aries/rsa/provider/tcp/LocalHostUtil.java | 92 +++++++++++++++ .../aries/rsa/provider/tcp/TCPProvider.java | 79 +++++++++++++ .../aries/rsa/provider/tcp/TCPServer.java | 117 +++++++++++++++++++ .../aries/rsa/provider/tcp/TcpEndpoint.java | 57 +++++++++ .../rsa/provider/tcp/TcpInvocationHandler.java | 66 +++++++++++ .../aries/rsa/provider/tcp/TcpProviderTest.java | 102 ++++++++++++++++ .../rsa/provider/tcp/myservice/MyService.java | 30 +++++ .../provider/tcp/myservice/MyServiceImpl.java | 36 ++++++ dsw/pom.xml | 1 + .../dosgi/systests2/multi/TestCustomIntent.java | 3 +- 15 files changed, 754 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/distribution/features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/distribution/features/src/main/resources/features.xml b/distribution/features/src/main/resources/features.xml index b39562b..d8f74d4 100644 --- a/distribution/features/src/main/resources/features.xml +++ b/distribution/features/src/main/resources/features.xml @@ -16,6 +16,19 @@ </feature> <feature name="cxf-dosgi-core" version="${project.version}"> + <bundle start-level="8">mvn:${project.groupId}/cxf-dosgi-ri-osgi-api/${project.version}</bundle> + <bundle>mvn:${project.groupId}/cxf-dosgi-ri-provider-api/${project.version}</bundle> + <bundle>mvn:${project.groupId}/cxf-dosgi-ri-rsa/${project.version}</bundle> + <bundle>mvn:${project.groupId}/cxf-dosgi-ri-topology-manager/${project.version}</bundle> + </feature> + + <feature name="cxf-dosgi-provider-tcp" version="${project.version}"> + <feature>cxf-dosgi-core</feature> + <bundle>mvn:${project.groupId}/cxf-dosgi-ri-tcp/${project.version}</bundle> + </feature> + + <feature name="cxf-dosgi-provider-cxf" version="${project.version}"> + <feature>cxf-dosgi-core</feature> <feature>cxf-specs</feature> <feature>cxf-jaxws</feature> <feature>cxf-jaxrs</feature> @@ -23,12 +36,9 @@ <feature>cxf-http-jetty</feature> <feature>cxf-http</feature> <feature>http</feature> - <bundle start-level="8">mvn:${project.groupId}/cxf-dosgi-ri-osgi-api/${project.version}</bundle> - <bundle>mvn:${project.groupId}/cxf-dosgi-ri-provider-api/${project.version}</bundle> - <bundle>mvn:${project.groupId}/cxf-dosgi-ri-rsa/${project.version}</bundle> - <bundle>mvn:${project.groupId}/cxf-dosgi-ri-topology-manager/${project.version}</bundle> <bundle>mvn:${project.groupId}/cxf-dosgi-ri-dsw-cxf/${project.version}</bundle> </feature> + <feature name="cxf-dosgi-discovery-local" version="${project.version}"> <feature>cxf-dosgi-core</feature> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/distribution/multi-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/multi-bundle/pom.xml b/distribution/multi-bundle/pom.xml index 07d1f2d..1b835ff 100644 --- a/distribution/multi-bundle/pom.xml +++ b/distribution/multi-bundle/pom.xml @@ -66,6 +66,8 @@ </descriptors> <features> <feature>cxf-dosgi-base</feature> + <feature>cxf-dosgi-core</feature> + <feature>cxf-dosgi-provider-cxf</feature> <feature>cxf-dosgi-discovery-distributed</feature> <feature>cxf-dosgi-zookeeper-server</feature> </features> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/pom.xml ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/pom.xml b/dsw/cxf-dosgi-tcp/pom.xml new file mode 100644 index 0000000..2d2e5f3 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/pom.xml @@ -0,0 +1,72 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.cxf.dosgi</groupId> + <artifactId>cxf-dosgi-ri-parent</artifactId> + <version>1.8-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + <artifactId>cxf-dosgi-ri-tcp</artifactId> + <packaging>bundle</packaging> + <name>CXF dOSGi Remote Service Admin TCP provider</name> + <description>Provider for Java Serialization over TCP</description> + + <properties> + <topDirectoryLocation>../..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf.dosgi</groupId> + <artifactId>cxf-dosgi-ri-provider-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>1.7.14</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> + <Bundle-Activator>org.apache.cxf.dosgi.dsw.service.Activator</Bundle-Activator> + </instructions> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/Activator.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/Activator.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/Activator.java new file mode 100644 index 0000000..71d54f4 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/Activator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.util.Dictionary; +import java.util.Hashtable; + +import org.apache.cxf.dosgi.dsw.api.DistributionProvider; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +public class Activator implements BundleActivator { + + @Override + public void start(BundleContext context) throws Exception { + DistributionProvider provider = new TCPProvider(); + Dictionary<String, ?> props = new Hashtable<>(); + context.registerService(DistributionProvider.class, provider, props); + } + + @Override + public void stop(BundleContext context) throws Exception { + // unregister happens automatically + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LoaderObjectInputStream.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LoaderObjectInputStream.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LoaderObjectInputStream.java new file mode 100644 index 0000000..ac60950 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LoaderObjectInputStream.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + +public class LoaderObjectInputStream extends ObjectInputStream { + + private ClassLoader loader; + + public LoaderObjectInputStream(InputStream in, ClassLoader loader) throws IOException { + super(in); + this.loader = loader; + } + + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + try { + return loader.loadClass(desc.getName()); + } catch (ClassNotFoundException e) { + return super.resolveClass(desc); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LocalHostUtil.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LocalHostUtil.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LocalHostUtil.java new file mode 100644 index 0000000..3f40bd8 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/LocalHostUtil.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +/** + * Utility methods to get the local address even on a linux host. + */ +public final class LocalHostUtil { + + private LocalHostUtil() { + // Util Class + } + + /** + * Returns an InetAddress representing the address of the localhost. Every + * attempt is made to find an address for this host that is not the loopback + * address. If no other address can be found, the loopback will be returned. + * + * @return InetAddress the address of localhost + * @throws UnknownHostException if there is a problem determining the address + */ + public static InetAddress getLocalHost() throws UnknownHostException { + InetAddress localHost = InetAddress.getLocalHost(); + if (!localHost.isLoopbackAddress()) { + return localHost; + } + InetAddress[] addrs = getAllLocalUsingNetworkInterface(); + for (InetAddress addr : addrs) { + if (!addr.isLoopbackAddress() && !addr.getHostAddress().contains(":")) { + return addr; + } + } + return localHost; + } + + /** + * Utility method that delegates to the methods of NetworkInterface to + * determine addresses for this machine. + * + * @return all addresses found from the NetworkInterfaces + * @throws UnknownHostException if there is a problem determining addresses + */ + private static InetAddress[] getAllLocalUsingNetworkInterface() throws UnknownHostException { + try { + List<InetAddress> addresses = new ArrayList<InetAddress>(); + Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); + while (e.hasMoreElements()) { + NetworkInterface ni = e.nextElement(); + for (Enumeration<InetAddress> e2 = ni.getInetAddresses(); e2.hasMoreElements();) { + addresses.add(e2.nextElement()); + } + } + return addresses.toArray(new InetAddress[] {}); + } catch (SocketException ex) { + throw new UnknownHostException("127.0.0.1"); + } + } + + public static String getLocalIp() { + String localIP; + try { + localIP = getLocalHost().getHostAddress(); + } catch (Exception e) { + localIP = "localhost"; + } + return localIP; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java new file mode 100644 index 0000000..9877f10 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.Map; + +import org.apache.cxf.dosgi.dsw.api.DistributionProvider; +import org.apache.cxf.dosgi.dsw.api.Endpoint; +import org.apache.cxf.dosgi.dsw.api.IntentUnsatisfiedException; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.wiring.BundleWiring; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +@SuppressWarnings("rawtypes") +public class TCPProvider implements DistributionProvider { + + private static final String TCP_CONFIG_TYPE = "aries.tcp"; + + @Override + public String[] getSupportedTypes() { + return new String[] {TCP_CONFIG_TYPE}; + } + + Endpoint exportService(Object service, Map<String, Object> effectiveProperties, + Class[] exportedInterfaces) { + effectiveProperties.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, getSupportedTypes()); + return new TcpEndpoint(service, effectiveProperties); + } + + @Override + public Endpoint exportService(ServiceReference<?> sref, Map<String, Object> effectiveProperties, + Class[] exportedInterfaces) { + BundleContext serviceContext = sref.getBundle().getBundleContext(); + Object service = serviceContext.getService(sref); + return exportService(service, effectiveProperties, exportedInterfaces); + } + + public Object importEndpoint(ClassLoader cl, Class[] interfaces, + EndpointDescription endpoint) + throws IntentUnsatisfiedException { + try { + URI address = new URI(endpoint.getId()); + InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort()); + return Proxy.newProxyInstance(cl, interfaces, handler); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Object importEndpoint(BundleContext consumerContext, Class[] interfaces, + EndpointDescription endpoint) + throws IntentUnsatisfiedException { + ClassLoader cl = consumerContext.getBundle().adapt(BundleWiring.class).getClassLoader(); + return importEndpoint(cl, interfaces, endpoint); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java new file mode 100644 index 0000000..0eeebc9 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class TCPServer implements Closeable, Runnable { + private ServerSocket serverSocket; + private Object service; + private boolean running; + private ExecutorService executor; + + public TCPServer(Object service, String localip, Integer port) { + this.service = service; + try { + this.serverSocket = new ServerSocket(port); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.running = true; + this.executor = Executors.newCachedThreadPool(); + for (int c = 0; c < 100; c++) { + this.executor.execute(this); + } + } + + int getPort() { + return this.serverSocket.getLocalPort(); + } + + + + public void run() { + ClassLoader serviceCL = service.getClass().getClassLoader(); + while (running) { + try ( + Socket socket = this.serverSocket.accept(); + ObjectInputStream ois = new LoaderObjectInputStream(socket.getInputStream(), serviceCL); + ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream()) + ) { + String methodName = (String)ois.readObject(); + Object[] args = (Object[])ois.readObject(); + Object result = invoke(methodName, args); + objectOutput.writeObject(result); + } catch (SocketException e) { + running = false; + } catch (EOFException e) { + // This is normal + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private Object invoke(String methodName, Object[] args) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + Class<?>[] parameterTypesAr = getTypes(args); + Method method = service.getClass().getMethod(methodName, parameterTypesAr); + try { + return method.invoke(service, args); + } catch (Throwable e) { + return e; + } + } + + private Class<?>[] getTypes(Object[] args) { + List<Class<?>> parameterTypes = new ArrayList<>(); + for (Object arg : args) { + parameterTypes.add(arg.getClass()); + } + Class<?>[] parameterTypesAr = parameterTypes.toArray(new Class[]{}); + return parameterTypesAr; + } + + @Override + public void close() throws IOException { + this.serverSocket.close(); + this.running = false; + this.executor.shutdown(); + try { + this.executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + this.executor.shutdownNow(); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java new file mode 100644 index 0000000..b45347a --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.io.IOException; +import java.util.Map; + +import org.apache.cxf.dosgi.dsw.api.Endpoint; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +public class TcpEndpoint implements Endpoint { + private EndpointDescription epd; + private TCPServer tcpServer; + + public TcpEndpoint(Object service, Map<String, Object> effectiveProperties) { + Integer port = getInt(effectiveProperties, "port"); + String localip = LocalHostUtil.getLocalIp(); + tcpServer = new TCPServer(service, localip, port); + effectiveProperties.put(RemoteConstants.ENDPOINT_ID, "tcp://" + localip + ":" + tcpServer.getPort()); + effectiveProperties.put(RemoteConstants.SERVICE_EXPORTED_CONFIGS, ""); + this.epd = new EndpointDescription(effectiveProperties); + } + + + private Integer getInt(Map<String, Object> effectiveProperties, String key) { + String value = (String)effectiveProperties.get(key); + return value != null ? Integer.parseInt(value) : 0; + } + + @Override + public EndpointDescription description() { + return this.epd; + } + + + @Override + public void close() throws IOException { + tcpServer.close(); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java new file mode 100644 index 0000000..37ed11e --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.net.Socket; +import java.net.UnknownHostException; + +public class TcpInvocationHandler implements InvocationHandler { + private String host; + private int port; + private ClassLoader cl; + + public TcpInvocationHandler(ClassLoader cl, String host, int port) + throws UnknownHostException, IOException { + this.cl = cl; + this.host = host; + this.port = port; + + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try ( + Socket socket = new Socket(this.host, this.port); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()) + ) { + out.writeObject(method.getName()); + out.writeObject(args); + out.flush(); + return parseResult(socket); + } + } + + private Object parseResult(Socket socket) throws IOException, ClassNotFoundException, Throwable { + try (ObjectInputStream in = new LoaderObjectInputStream(socket.getInputStream(), cl)) { + Object result = in.readObject(); + if (result instanceof Throwable) { + throw (Throwable)result; + } else { + return result; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java new file mode 100644 index 0000000..570bc85 --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.aries.rsa.provider.tcp.myservice.MyService; +import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl; +import org.apache.cxf.dosgi.dsw.api.Endpoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.osgi.framework.Constants; + +public class TcpProviderTest { + + private static final int NUM_CALLS = 10000; + private MyService myServiceProxy; + private Endpoint ep; + + @Before + public void createServerAndProxy() { + TCPProvider provider = new TCPProvider(); + Map<String, Object> effectiveProperties = new HashMap<String, Object>(); + effectiveProperties.put(Constants.OBJECTCLASS, new String[] {MyService.class.getName()}); + Class<?>[] exportedInterfaces = new Class[] {MyService.class}; + MyService myService = new MyServiceImpl(); + ep = provider.exportService(myService, effectiveProperties, exportedInterfaces); + myServiceProxy = (MyService)provider.importEndpoint(MyService.class.getClassLoader(), + exportedInterfaces, ep.description()); + } + + @Test + public void testPerf() throws IOException, InterruptedException { + //runPerfTest(myServiceProxy); + String msg = "test"; + String result = myServiceProxy.echo(msg); + Assert.assertEquals(msg, result); + } + + @Test + public void testCall() throws IOException, InterruptedException { + myServiceProxy.call("test"); + } + + @Test + public void testCallOneway() throws IOException, InterruptedException { + myServiceProxy.callOneWay("test"); + } + + @After + public void close() throws IOException { + ep.close(); + } + + private void runPerfTest(final MyService myServiceProxy2) throws InterruptedException { + StringBuilder msg = new StringBuilder(); + for (int c = 0; c < 1000; c++) { + msg.append("testing123"); + } + final String msg2 = msg.toString(); + ExecutorService executor = Executors.newFixedThreadPool(100); + Runnable task = new Runnable() { + + @Override + public void run() { + String result = myServiceProxy2.echo(msg2); + Assert.assertEquals(msg2, result); + } + }; + long start = System.currentTimeMillis(); + for (int c = 0; c < NUM_CALLS; c++) { + executor.execute(task); + } + executor.shutdown(); + executor.awaitTermination(100, TimeUnit.SECONDS); + long tps = NUM_CALLS * 1000 / (System.currentTimeMillis() - start); + System.out.println(tps); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java new file mode 100644 index 0000000..e9d56bf --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp.myservice; + +import javax.jws.Oneway; + +public interface MyService { + String echo(String msg); + void call(String msg); + + // Oneway not yet supported + @Oneway + void callOneWay(String msg); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java new file mode 100644 index 0000000..5f469ed --- /dev/null +++ b/dsw/cxf-dosgi-tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.tcp.myservice; + +public class MyServiceImpl implements MyService { + + @Override + public String echo(String msg) { + return msg; + } + + @Override + public void call(String msg) { + } + + @Override + public void callOneWay(String msg) { + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/dsw/pom.xml ---------------------------------------------------------------------- diff --git a/dsw/pom.xml b/dsw/pom.xml index 5913585..375d8fc 100644 --- a/dsw/pom.xml +++ b/dsw/pom.xml @@ -41,5 +41,6 @@ <module>cxf-dosgi-provider-api</module> <module>cxf-dosgi-rsa</module> <module>cxf-dsw</module> + <module>cxf-dosgi-tcp</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/80b4dc28/systests2/multi-bundle/src/test/java/org/apache/cxf/dosgi/systests2/multi/TestCustomIntent.java ---------------------------------------------------------------------- diff --git a/systests2/multi-bundle/src/test/java/org/apache/cxf/dosgi/systests2/multi/TestCustomIntent.java b/systests2/multi-bundle/src/test/java/org/apache/cxf/dosgi/systests2/multi/TestCustomIntent.java index 2200567..04ddcdc 100644 --- a/systests2/multi-bundle/src/test/java/org/apache/cxf/dosgi/systests2/multi/TestCustomIntent.java +++ b/systests2/multi-bundle/src/test/java/org/apache/cxf/dosgi/systests2/multi/TestCustomIntent.java @@ -23,8 +23,6 @@ import java.util.Map; import javax.inject.Inject; -import junit.framework.Assert; - import org.apache.cxf.dosgi.samples.greeter.GreeterService; import org.apache.cxf.dosgi.samples.greeter.GreetingPhrase; import org.apache.cxf.dosgi.systests2.multi.customintent.AddGreetingPhraseInterceptor; @@ -33,6 +31,7 @@ import org.apache.cxf.dosgi.systests2.multi.customintent.CustomIntentActivator; import org.apache.cxf.dosgi.systests2.multi.customintent.service.EmptyGreeterService; import org.apache.cxf.dosgi.systests2.multi.customintent.service.GreeterServiceWithCustomIntentActivator; import org.apache.cxf.frontend.ClientProxyFactoryBean; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.ops4j.pax.exam.Configuration;
