[ARIES-1526] Add a fast nio/protobuf provider Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/62d835de Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/62d835de Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/62d835de
Branch: refs/heads/master Commit: 62d835dec07d9592b1ea3fe038cea39b1e9af9aa Parents: 58bb694 Author: Guillaume Nodet <[email protected]> Authored: Mon Apr 11 12:08:14 2016 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Mon Apr 11 14:04:42 2016 +0200 ---------------------------------------------------------------------- features/src/main/resources/features.xml | 9 +- parent/pom.xml | 19 +- provider/fastbin/bnd.bnd | 1 + provider/fastbin/pom.xml | 62 ++ .../aries/rsa/provider/fastbin/Activator.java | 66 ++ .../rsa/provider/fastbin/BaseActivator.java | 290 +++++++ .../rsa/provider/fastbin/FastBinProvider.java | 146 ++++ .../rsa/provider/fastbin/api/AsyncCallback.java | 26 + .../fastbin/api/AsyncCallbackFuture.java | 43 + .../rsa/provider/fastbin/api/Dispatched.java | 31 + .../api/ObjectSerializationStrategy.java | 73 ++ .../api/ProtobufSerializationStrategy.java | 127 +++ .../rsa/provider/fastbin/api/Serialization.java | 29 + .../fastbin/api/SerializationStrategy.java | 38 + .../rsa/provider/fastbin/io/ClientInvoker.java | 24 + .../rsa/provider/fastbin/io/ProtocolCodec.java | 97 +++ .../rsa/provider/fastbin/io/ServerInvoker.java | 34 + .../aries/rsa/provider/fastbin/io/Service.java | 52 ++ .../rsa/provider/fastbin/io/Transport.java | 95 +++ .../fastbin/io/TransportAcceptListener.java | 31 + .../provider/fastbin/io/TransportListener.java | 55 ++ .../provider/fastbin/io/TransportServer.java | 73 ++ .../fastbin/tcp/AsyncInvocationStrategy.java | 188 +++++ .../fastbin/tcp/BlockingInvocationStrategy.java | 126 +++ .../provider/fastbin/tcp/ClientInvokerImpl.java | 327 ++++++++ .../fastbin/tcp/InvocationStrategy.java | 34 + .../fastbin/tcp/LengthPrefixedCodec.java | 175 ++++ .../provider/fastbin/tcp/ResponseFuture.java | 31 + .../provider/fastbin/tcp/ServerInvokerImpl.java | 314 +++++++ .../rsa/provider/fastbin/tcp/TcpTransport.java | 828 +++++++++++++++++++ .../fastbin/tcp/TcpTransportFactory.java | 123 +++ .../fastbin/tcp/TcpTransportServer.java | 231 ++++++ .../rsa/provider/fastbin/tcp/TransportPool.java | 256 ++++++ .../util/ClassLoaderObjectInputStream.java | 86 ++ .../fastbin/util/IntrospectionSupport.java | 362 ++++++++ .../provider/fastbin/util/StringSupport.java | 40 + .../rsa/provider/fastbin/util/URISupport.java | 332 ++++++++ .../provider/fastbin/util/UuidGenerator.java | 178 ++++ .../rsa/provider/fastbin/InvocationTest.java | 581 +++++++++++++ .../aries/rsa/provider/fastbin/ManagerTest.java | 106 +++ .../provider/fastbin/TransportFailureTest.java | 128 +++ .../fastbin/tcp/LengthPrefixedCodecTest.java | 150 ++++ provider/fastbin/src/test/proto/example.proto | 28 + .../fastbin/src/test/resources/log4j.properties | 35 + provider/pom.xml | 1 + 45 files changed, 6079 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/features/src/main/resources/features.xml b/features/src/main/resources/features.xml index 1f9aea0..ba9bc93 100644 --- a/features/src/main/resources/features.xml +++ b/features/src/main/resources/features.xml @@ -11,7 +11,14 @@ <feature>aries-rsa-core</feature> <bundle>mvn:org.apache.aries.rsa.provider/org.apache.aries.rsa.provider.tcp/${project.version}</bundle> </feature> - + + <feature name="aries-rsa-provider-fastbin" version="${project.version}"> + <feature>aries-rsa-core</feature> + <bundle>mvn:org.fusesource.hawtdispatch/hawtdispatch/${hawtdispatch.version}</bundle> + <bundle>mvn:org.fusesource.hawtbuf/hawtbuf/${hawtbuf.version}</bundle> + <bundle>mvn:org.apache.aries.rsa.provider/org.apache.aries.rsa.provider.fastbin/${project.version}</bundle> + </feature> + <feature name="aries-rsa-discovery-local" version="${project.version}"> <feature>aries-rsa-core</feature> <bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.local/${project.version}</bundle> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index bdef21b..3ffada2 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -33,6 +33,8 @@ <rsa.version>1.0.0</rsa.version> <slf4j.version>1.7.14</slf4j.version> <log4j.version>1.2.6</log4j.version> + <hawtdispatch.version>1.21</hawtdispatch.version> + <hawtbuf.version>1.11</hawtbuf.version> <exam.version>4.8.0</exam.version> <cxf.resources.base.path /> <cxf.checkstyle.extension /> @@ -104,7 +106,22 @@ <artifactId>org.apache.aries.rsa.spi</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>org.fusesource.hawtdispatch</groupId> + <artifactId>hawtdispatch</artifactId> + <version>${hawtdispatch.version}</version> + </dependency> + <dependency> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf</artifactId> + <version>${hawtbuf.version}</version> + </dependency> + <dependency> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf-proto</artifactId> + <version>${hawtbuf.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/bnd.bnd ---------------------------------------------------------------------- diff --git a/provider/fastbin/bnd.bnd b/provider/fastbin/bnd.bnd new file mode 100644 index 0000000..5249b00 --- /dev/null +++ b/provider/fastbin/bnd.bnd @@ -0,0 +1 @@ +Bundle-Activator: org.apache.aries.rsa.provider.fastbin.Activator http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/pom.xml ---------------------------------------------------------------------- diff --git a/provider/fastbin/pom.xml b/provider/fastbin/pom.xml new file mode 100644 index 0000000..a748035 --- /dev/null +++ b/provider/fastbin/pom.xml @@ -0,0 +1,62 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.parent</artifactId> + <version>1.9-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + + <groupId>org.apache.aries.rsa.provider</groupId> + <artifactId>org.apache.aries.rsa.provider.fastbin</artifactId> + <packaging>bundle</packaging> + <name>Aries Remote Service Admin provider FastBin</name> + <description>Provider for Java Serialization over FastBin</description> + + <properties> + <topDirectoryLocation>../..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.spi</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.fusesource.hawtdispatch</groupId> + <artifactId>hawtdispatch</artifactId> + </dependency> + <dependency> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf</artifactId> + </dependency> + <!-- this is only needed you you want to use the ProtobufSerializationStrategy --> + <dependency> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf-proto</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf-protoc</artifactId> + <version>${hawtbuf.version}</version> + <configuration> + <type>alt</type> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java new file mode 100644 index 0000000..b89de14 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.concurrent.TimeUnit; + +import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator; +import org.apache.aries.rsa.spi.DistributionProvider; +import org.osgi.service.cm.ManagedService; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +public class Activator extends BaseActivator implements ManagedService { + + private FastBinProvider provider; + + @Override + protected void doOpen() throws Exception { + manage("org.apache.aries.rsa.provider.fastbin"); + } + + @Override + protected void doStart() throws Exception { + String uri = getString("uri", "tcp://0.0.0.0:2543"); + String exportedAddress = getString("exportedAddress", null); + if (exportedAddress == null) { + exportedAddress = UuidGenerator.getHostName(); + } + long timeout = getLong("timeout", TimeUnit.MINUTES.toMillis(5)); + provider = new FastBinProvider(uri, exportedAddress, timeout); + Dictionary<String, Object> props = new Hashtable<>(); + props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[]{}); + props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, provider.getSupportedTypes()); + register(DistributionProvider.class, provider, props); + } + + @Override + protected void doStop() { + super.doStop(); + if (provider != null) { + try { + provider.close(); + } finally { + provider = null; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java new file mode 100644 index 0000000..6b7b6fe --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin; + +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseActivator implements BundleActivator, Runnable { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected BundleContext bundleContext; + + protected ExecutorService executor = new ThreadPoolExecutor(0, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + private AtomicBoolean scheduled = new AtomicBoolean(); + + private long schedulerStopTimeout = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); + + private List<ServiceRegistration> registrations; + private ServiceRegistration managedServiceRegistration; + private Dictionary<String, ?> configuration; + + public long getSchedulerStopTimeout() { + return schedulerStopTimeout; + } + + public void setSchedulerStopTimeout(long schedulerStopTimeout) { + this.schedulerStopTimeout = schedulerStopTimeout; + } + + @Override + public void start(BundleContext context) throws Exception { + bundleContext = context; + scheduled.set(true); + doOpen(); + scheduled.set(false); + if (managedServiceRegistration == null) { + try { + doStart(); + } catch (Exception e) { + logger.warn("Error starting activator", e); + doStop(); + } + } else { + reconfigure(); + } + } + + @Override + public void stop(BundleContext context) throws Exception { + scheduled.set(true); + doClose(); + executor.shutdown(); + executor.awaitTermination(schedulerStopTimeout, TimeUnit.MILLISECONDS); + doStop(); + } + + protected void doOpen() throws Exception { + URL data = bundleContext.getBundle().getResource("OSGI-INF/karaf-tracker/" + getClass().getName()); + if (data != null) { + Properties props = new Properties(); + try (InputStream is = data.openStream()) { + props.load(is); + } + for (String key : props.stringPropertyNames()) { + if ("pid".equals(key)) { + manage(props.getProperty(key)); + } + } + } + } + + protected void doClose() { + if (managedServiceRegistration != null) { + managedServiceRegistration.unregister(); + } + } + + protected void doStart() throws Exception { + } + + protected void doStop() { + if (registrations != null) { + for (ServiceRegistration reg : registrations) { + reg.unregister(); + } + registrations = null; + } + } + + /** + * Called in {@link #doOpen()}. + * + * @param pid The configuration PID to manage (ManagedService). + */ + protected void manage(String pid) { + Hashtable<String, Object> props = new Hashtable<>(); + props.put(Constants.SERVICE_PID, pid); + managedServiceRegistration = bundleContext.registerService( + "org.osgi.service.cm.ManagedService", this, props); + } + + public void updated(Dictionary<String, ?> properties) { + this.configuration = properties; + reconfigure(); + } + + protected Dictionary<String, ?> getConfiguration() { + return configuration; + } + + /** + * Called in {@link #doStart()}. + * + * @param key The configuration key + * @param def The default value. + * @return The value of the configuration key if found, the default value else. + */ + protected int getInt(String key, int def) { + if (configuration != null) { + Object val = configuration.get(key); + if (val instanceof Number) { + return ((Number) val).intValue(); + } else if (val != null) { + return Integer.parseInt(val.toString()); + } + } + return def; + } + + /** + * Called in {@link #doStart()}. + * + * @param key The configuration key. + * @param def The default value. + * @return The value of the configuration key if found, the default value else. + */ + protected boolean getBoolean(String key, boolean def) { + if (configuration != null) { + Object val = configuration.get(key); + if (val instanceof Boolean) { + return (Boolean) val; + } else if (val != null) { + return Boolean.parseBoolean(val.toString()); + } + } + return def; + } + + /** + * Called in {@link #doStart()}. + * + * @param key The configuration key. + * @param def The default value. + * @return The value of the configuration key if found, the default value else. + */ + protected long getLong(String key, long def) { + if (configuration != null) { + Object val = configuration.get(key); + if (val instanceof Number) { + return ((Number) val).longValue(); + } else if (val != null) { + return Long.parseLong(val.toString()); + } + } + return def; + } + + /** + * Called in {@link #doStart()}. + * + * @param key The configuration key. + * @param def The default value. + * @return The value of the configuration key if found, the default value else. + */ + protected String getString(String key, String def) { + if (configuration != null) { + Object val = configuration.get(key); + if (val != null) { + return val.toString(); + } + } + return def; + } + + protected void reconfigure() { + if (scheduled.compareAndSet(false, true)) { + executor.submit(this); + } + } + + @Override + public void run() { + scheduled.set(false); + doStop(); + try { + doStart(); + } catch (Exception e) { + logger.warn("Error starting activator", e); + doStop(); + } + } + + /** + * Called in {@link #doStart()}. + * + * @param clazz The service interface to register. + * @param <T> The service type. + * @param service The actual service instance to register. + */ + protected <T> void register(Class<T> clazz, T service) { + register(clazz, service, null); + } + + /** + * Called in {@link #doStart()}. + * + * @param clazz The service interface to register. + * @param <T> The service type. + * @param service The actual service instance to register. + * @param props The service properties to register. + */ + protected <T> void register(Class<T> clazz, T service, Dictionary<String, ?> props) { + trackRegistration(bundleContext.registerService(clazz, service, props)); + } + + /** + * Called in {@link #doStart()}. + * + * @param clazz The service interfaces to register. + * @param service The actual service instance to register. + */ + protected void register(Class[] clazz, Object service) { + register(clazz, service, null); + } + + /** + * Called in {@link #doStart()}. + * + * @param clazz The service interfaces to register. + * @param service The actual service instance to register. + * @param props The service properties to register. + */ + protected void register(Class[] clazz, Object service, Dictionary<String, ?> props) { + String[] names = new String[clazz.length]; + for (int i = 0; i < clazz.length; i++) { + names[i] = clazz[i].getName(); + } + trackRegistration(bundleContext.registerService(names, service, props)); + } + + private void trackRegistration(ServiceRegistration registration) { + if (registrations == null) { + registrations = new ArrayList<>(); + } + registrations.add(registration); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java new file mode 100644 index 0000000..476a699 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java @@ -0,0 +1,146 @@ +package org.apache.aries.rsa.provider.fastbin; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.apache.aries.rsa.provider.fastbin.io.ClientInvoker; +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; +import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl; +import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl; +import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator; +import org.apache.aries.rsa.spi.DistributionProvider; +import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.IntentUnsatisfiedException; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.osgi.framework.BundleContext; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +public class FastBinProvider implements DistributionProvider { + + public static final String FASTBIN_CONFIG_TYPE = "aries.fastbin"; + + public static final String FASTBIN_ADDRESS = FASTBIN_CONFIG_TYPE + ".address"; + + private final String uri; + private final String exportedAddress; + private final long timeout; + + private final DispatchQueue queue = Dispatch.createQueue(); + private final Map<String, SerializationStrategy> serializationStrategies = new ConcurrentHashMap<>(); + + private ClientInvoker client; + private ServerInvoker server; + + public FastBinProvider(java.lang.String uri, java.lang.String exportedAddress, long timeout) throws Exception { + this.uri = uri; + this.exportedAddress = exportedAddress; + this.timeout = timeout; + // Create client and server + this.client = new ClientInvokerImpl(queue, timeout, serializationStrategies); + this.server = new ServerInvokerImpl(uri, queue, serializationStrategies); + this.client.start(); + this.server.start(); + } + + public void close() { + client.stop(); + server.stop(); + } + + @Override + public String[] getSupportedTypes() { + return new String[] {FASTBIN_CONFIG_TYPE}; + } + + @Override + public Endpoint exportService(final Object serviceO, + BundleContext serviceContext, + Map<String, Object> effectiveProperties, + Class[] exportedInterfaces) { + + // Compute properties + /* + Map<String, Object> properties = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER); + for (String k : reference.getPropertyKeys()) { + properties.put(k, reference.getProperty(k)); + } + // Bail out if there is any intents specified, we don't support any + Set<String> intents = Utils.normalize(properties.get(SERVICE_EXPORTED_INTENTS)); + Set<String> extraIntents = Utils.normalize(properties.get(SERVICE_EXPORTED_INTENTS_EXTRA)); + if (!intents.isEmpty() || !extraIntents.isEmpty()) { + throw new UnsupportedOperationException(); + } + // Bail out if there are any configurations specified, we don't support any + Set<String> configs = Utils.normalize(properties.get(SERVICE_EXPORTED_CONFIGS)); + if (configs.isEmpty()) { + configs.add(CONFIG); + } else if (!configs.contains(CONFIG)) { + throw new UnsupportedOperationException(); + } + + URI connectUri = new URI(this.server.getConnectAddress()); + String fabricAddress = connectUri.getScheme() + "://" + exportedAddress + ":" + connectUri.getPort(); + + properties.remove(SERVICE_EXPORTED_CONFIGS); + properties.put(SERVICE_IMPORTED_CONFIGS, new String[] { CONFIG }); + properties.put(ENDPOINT_FRAMEWORK_UUID, this.uuid); + properties.put(FABRIC_ADDRESS, fabricAddress); + + String uuid = UuidGenerator.getUUID(); + properties.put(ENDPOINT_ID, uuid); + */ + + String endpointId = UuidGenerator.getUUID(); + effectiveProperties.put(RemoteConstants.ENDPOINT_ID, endpointId); + + URI connectUri = URI.create(this.server.getConnectAddress()); + String fastbinAddress = connectUri.getScheme() + "://" + exportedAddress + ":" + connectUri.getPort(); + effectiveProperties.put(FASTBIN_ADDRESS, fastbinAddress); + effectiveProperties.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, getSupportedTypes()); + + // Now, export the service + final EndpointDescription description = new EndpointDescription(effectiveProperties); + + // Export it + server.registerService(description.getId(), new ServerInvoker.ServiceFactory() { + public Object get() { + return serviceO; + } + public void unget() { + } + }, serviceO.getClass().getClassLoader()); + + return new Endpoint() { + @Override + public EndpointDescription description() { + return description; + } + + @Override + public void close() throws IOException { + server.unregisterService(description.getId()); + } + }; + } + + @Override + public Object importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint) + throws IntentUnsatisfiedException { + + String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS); + InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl); + return Proxy.newProxyInstance(cl, interfaces, handler); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java new file mode 100644 index 0000000..5ca7bbd --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java @@ -0,0 +1,26 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +/** + * <p> + * </p> + * + */ +public interface AsyncCallback<T> { + void onSuccess(T result); + void onFailure(Throwable failure); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java new file mode 100644 index 0000000..71906bd --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java @@ -0,0 +1,43 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** + * <p> + * </p> + * + */ +public class AsyncCallbackFuture<T> extends FutureTask<T> implements AsyncCallback<T> { + + public AsyncCallbackFuture() { + super(new Callable<T>() { + public T call() { + return null; + } + }); + } + + public void onSuccess(T result) { + super.set(result); + } + + public void onFailure(Throwable failure) { + super.setException(failure); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java new file mode 100644 index 0000000..77864d1 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java @@ -0,0 +1,31 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import org.fusesource.hawtdispatch.DispatchQueue; + +/** + * <p> + * Implemented by object which expect to be call from the execution context + * of a dispatch queue. + * </p> + * + */ +public interface Dispatched { + + DispatchQueue queue(); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java new file mode 100644 index 0000000..c5762ca --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java @@ -0,0 +1,73 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import java.io.IOException; +import java.io.ObjectOutputStream; + +import org.apache.aries.rsa.provider.fastbin.util.ClassLoaderObjectInputStream; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; + +/** + * <p> + * </p> + * + */ +public class ObjectSerializationStrategy implements SerializationStrategy { + public static final ObjectSerializationStrategy INSTANCE = new ObjectSerializationStrategy(); + + public String name() { + return "object"; + } + + public void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(target); + oos.writeObject(args); + oos.flush(); + } + + public void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws IOException, ClassNotFoundException { + ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(source); + ois.setClassLoader(loader); + Throwable error = (Throwable) ois.readObject(); + Object value = ois.readObject(); + if (error != null) { + result.onFailure(error); + } else { + result.onSuccess(value); + } + } + + public void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws IOException, ClassNotFoundException { + final ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(source); + ois.setClassLoader(loader); + final Object[] args = (Object[]) ois.readObject(); + if( args!=null ) { + System.arraycopy(args, 0, target, 0, args.length); + } + } + + + public void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws IOException, ClassNotFoundException { + ObjectOutputStream oos = new ObjectOutputStream(target); + oos.writeObject(error); + oos.writeObject(value); + oos.flush(); + } + + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java new file mode 100644 index 0000000..c53c65d --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java @@ -0,0 +1,127 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.proto.PBMessage; +import org.fusesource.hawtbuf.proto.PBMessageFactory; + +/** + * <p> + * </p> + * + */ +public class ProtobufSerializationStrategy implements SerializationStrategy { + + public static final ProtobufSerializationStrategy INSTANCE = new ProtobufSerializationStrategy(); + + public String name() { + return "protobuf"; + } + + private void encodeProtobuf(Class<?> type, Object arg, DataByteArrayOutputStream target) throws IOException { + if( !PBMessage.class.isAssignableFrom(type) ) { + throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName()); + } + PBMessage msg = (PBMessage) arg; + if( msg==null ) { + return; + } + msg.freeze().writeUnframed(target); + } + + private Object decodeProtobuf(Class<?> type, DataByteArrayInputStream source) throws IllegalAccessException, NoSuchFieldException, IOException { + if( !PBMessage.class.isAssignableFrom(type) ) { + throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName()); + } + + // Get the factory instance... + PBMessageFactory factory = (PBMessageFactory) type.getEnclosingClass().getField("FACTORY").get(null); + PBMessage msg = factory.parseUnframed(source); + String name = type.getName(); + Object rc; + if( name.endsWith("$Getter") || name.endsWith("$Buffer") ) { + // Interface is ok we us giving them a read only impl. + rc = msg; + } else { + // They want a read/write impl. + rc = msg.copy(); + } + return rc; + } + + public void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws IOException { + if( types.length == 0 ) { + return; + } else if( types.length == 1 ) { + encodeProtobuf(types[0], args[0], target); + } else { + throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument."); + } + } + + public void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException { + if( types.length == 0 ) { + return; + } else if( types.length == 1 ) { + target[0] = decodeProtobuf(types[0], source); + } else { + throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument."); + } + } + + public void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws IOException, ClassNotFoundException { + if( error!=null ) { + target.writeBoolean(true); + target.writeUTF(error.getClass().getName()); + target.writeUTF(error.getMessage()); + } else { + target.writeBoolean(false); + encodeProtobuf(type, value, target); + } + } + + public void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException { + if( source.readBoolean() ) { + String className = source.readUTF(); + String message = source.readUTF(); + + Throwable error; + try { + // try to build the exception... + Constructor<?> ctr = loader.loadClass(className).getConstructor(new Class[]{String.class}); + error = (Throwable) ctr.newInstance(message); + } catch (Throwable e) { + // fallback to something simple.. + error = new RuntimeException(className+": "+message); + } + result.onFailure(error); + + } else { + result.onSuccess(decodeProtobuf(type, source)); + } + + } + + + + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java new file mode 100644 index 0000000..3650085 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java @@ -0,0 +1,29 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface Serialization { + String value(); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java new file mode 100644 index 0000000..d5ac175 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java @@ -0,0 +1,38 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.api; + +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; + +/** + * <p> + * </p> + * + */ +public interface SerializationStrategy { + + String name(); + + void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws Exception; + + void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws Exception; + + void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws Exception; + + void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java new file mode 100644 index 0000000..1d04e7c --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java @@ -0,0 +1,24 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +import java.lang.reflect.InvocationHandler; + +public interface ClientInvoker extends Service { + + InvocationHandler getProxy(String address, String service, ClassLoader classLoader); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java new file mode 100644 index 0000000..e5ea148 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java @@ -0,0 +1,97 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + + + +/** + * Interface to encode and decode commands in and out of a a non blocking channel. + * + */ +public interface ProtocolCodec { + + /////////////////////////////////////////////////////////////////// + // + // Methods related with reading from the channel + // + /////////////////////////////////////////////////////////////////// + + /** + * @param channel + */ + public void setReadableByteChannel(ReadableByteChannel channel); + + /** + * Non-blocking channel based decoding. + * + * @return + * @throws IOException + */ + Object read() throws IOException; + + /** + * @return The number of bytes received. + */ + public long getReadCounter(); + + + /////////////////////////////////////////////////////////////////// + // + // Methods related with writing to the channel + // + /////////////////////////////////////////////////////////////////// + + + enum BufferState { + EMPTY, + WAS_EMPTY, + NOT_EMPTY, + FULL, + } + + public void setWritableByteChannel(WritableByteChannel channel); + + /** + * Non-blocking channel based encoding. + * + * @return true if the write completed. + * @throws IOException + */ + BufferState write(Object value) throws IOException; + + /** + * Attempts to complete the previous write which did not complete. + * @return + * @throws IOException + */ + BufferState flush() throws IOException; + + /** + * @return true if the codec will no accept any more writes. + */ + boolean full(); + + /** + * @return The number of bytes written. + */ + public long getWriteCounter(); + + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java new file mode 100644 index 0000000..bc1d68c --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java @@ -0,0 +1,34 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +public interface ServerInvoker extends Service { + + String getConnectAddress(); + + void registerService(String id, ServiceFactory service, ClassLoader classLoader); + + void unregisterService(String id); + + + public interface ServiceFactory { + + Object get(); + + void unget(); + + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java new file mode 100644 index 0000000..71bbd5d --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java @@ -0,0 +1,52 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + + +/** + * The core lifecyle interface for ActiveMQ components. + * + * @version $Revision: 1.1 $ + */ +public interface Service { + + /** + * Starts the service. No guarantee is given that the service has fully started + * by the time this method returns. + */ + void start() throws Exception; + + /** + * Starts the service. Executes the onComplete runnable once the service has fully started up. + * + * @param onComplete my be set to null if not interested in a callback. + */ + void start(Runnable onComplete) throws Exception; + + /** + * Stops the service. No guarantee is given that the service has fully stopped + * by the time this method returns. + */ + void stop(); + + /** + * Stops the service. Executes the onComplete runnable once the service has fully stopped. + * + * @param onComplete my be set to null if not interested in a callback. + */ + void stop(Runnable onComplete); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java new file mode 100644 index 0000000..f41b478 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java @@ -0,0 +1,95 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +import org.apache.aries.rsa.provider.fastbin.api.Dispatched; +import org.fusesource.hawtdispatch.DispatchQueue; + +/** + * Represents an abstract connection. It can be a client side or server side connection. + * + */ +public interface Transport extends Service, Dispatched { + + + boolean full(); + + /** + * A one way asynchronous send of a command. Only sent if the the transport is not full. + * + * @param command + * @return true if the command was accepted. + */ + boolean offer(Object command); + + /** + * Returns the current transport listener + * + * @return + */ + TransportListener getTransportListener(); + + /** + * Registers an inbound command listener + * + * @param commandListener + */ + void setTransportListener(TransportListener commandListener); + + /** + * Sets the dispatch queue used by the transport + * + * @param queue + */ + void setDispatchQueue(DispatchQueue queue); + + /** + * suspend delivery of commands. + */ + void suspendRead(); + + /** + * resume delivery of commands. + */ + void resumeRead(); + + /** + * @return the remote address for this connection + */ + String getRemoteAddress(); + + /** + * @return true if the transport is disposed + */ + boolean isDisposed(); + + /** + * @return true if the transport is connected + */ + boolean isConnected(); + + /** + * @return The protocol codec for the transport. + */ + ProtocolCodec getProtocolCodec(); + + /** + * Sets the protocol codec for the transport + * @param protocolCodec + */ + void setProtocolCodec(ProtocolCodec protocolCodec); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java new file mode 100644 index 0000000..247978c --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java @@ -0,0 +1,31 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + + +import org.apache.aries.rsa.provider.fastbin.tcp.TcpTransport; + +/** + * Implemented by object that need to get injected by + * + */ +public interface TransportAcceptListener { + + void onAccept(TransportServer transportServer, TcpTransport transport); + + void onAcceptError(TransportServer transportServer, Exception error); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java new file mode 100644 index 0000000..4b16bde --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java @@ -0,0 +1,55 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +import java.io.IOException; + + +/** + * An asynchronous listener of commands + * + */ +public interface TransportListener { + + /** + * called to process a command + * @param command + */ + void onTransportCommand(Transport transport, Object command); + + /** + * transport can now accept more commands for transmission. + */ + void onRefill(Transport transport); + + /** + * An unrecoverable exception has occured on the transport + * @param error + */ + void onTransportFailure(Transport transport, IOException error); + + /** + * The transport has been connected. + */ + public void onTransportConnected(Transport transport); + + /** + * The transport has suffered a disconnection from + * which it hopes to recover + */ + public void onTransportDisconnected(Transport transport); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java new file mode 100644 index 0000000..e8f620e --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java @@ -0,0 +1,73 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.io; + +import java.net.InetSocketAddress; + +import org.fusesource.hawtdispatch.DispatchQueue; + +/** + * A TransportServer asynchronously accepts {@see Transport} objects and then + * delivers those objects to a {@see TransportAcceptListener}. + * + * @version $Revision: 1.4 $ + */ +public interface TransportServer extends Service { + + /** + * Registers an {@see TransportAcceptListener} which is notified of accepted + * channels. + * + * @param acceptListener + */ + void setAcceptListener(TransportAcceptListener acceptListener); + + String getBoundAddress(); + + String getConnectAddress(); + + /** + * @return The socket address that this transport is accepting connections + * on or null if this does not or is not currently accepting + * connections on a socket. + */ + InetSocketAddress getSocketAddress(); + + /** + * Returns the dispatch queue used by the transport + * + * @return + */ + DispatchQueue getDispatchQueue(); + + /** + * Sets the dispatch queue used by the transport + * + * @param queue + */ + void setDispatchQueue(DispatchQueue queue); + + /** + * suspend accepting new transports + */ + void suspend(); + + /** + * resume accepting new transports + */ + void resume(); + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java new file mode 100644 index 0000000..33b4b23 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java @@ -0,0 +1,188 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.rmi.RemoteException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; + +/** + * <p> + * </p> + * + */ +public class AsyncInvocationStrategy implements InvocationStrategy { + + public static final AsyncInvocationStrategy INSTANCE = new AsyncInvocationStrategy(); + + static public boolean isAsyncMethod(Method method) { + Class<?>[] types = method.getParameterTypes(); + return types.length != 0 && types[types.length - 1] == AsyncCallback.class; + } + + + private class AsyncResponseFuture implements ResponseFuture { + + private final ClassLoader loader; + private final Method method; + private final AsyncCallback callback; + private final SerializationStrategy serializationStrategy; + private final DispatchQueue queue; + + public AsyncResponseFuture(ClassLoader loader, Method method, AsyncCallback callback, SerializationStrategy serializationStrategy, DispatchQueue queue) { + this.loader = loader; + this.method = method; + this.callback = callback; + this.serializationStrategy = serializationStrategy; + this.queue = queue; + } + + public void set(final DataByteArrayInputStream source) { + if( queue!=null ) { + queue.execute(new Runnable() { + public void run() { + decodeIt(source); + } + }); + } else { + decodeIt(source); + } + } + + private void decodeIt(DataByteArrayInputStream source) { + try { + serializationStrategy.decodeResponse(loader, getResultType(method), source, callback); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + // TODO: we could store the timeout so we can time out the async request... + return null; + } + + @Override + public void fail(Throwable throwable) { + callback.onFailure(throwable); + } + } + + public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception { + if(!isAsyncMethod(method)) { + throw new IllegalArgumentException("Invalid async method declaration: last argument is not a RequestCallback"); + } + + Class[] new_types = payloadTypes(method); + Object[] new_args = new Object[args.length-1]; + System.arraycopy(args, 0, new_args, 0, new_args.length); + + serializationStrategy.encodeRequest(loader, new_types, new_args, target); + + return new AsyncResponseFuture(loader, method, (AsyncCallback) args[args.length-1], serializationStrategy, Dispatch.getCurrentQueue()); + } + + static private Class<?>[] payloadTypes(Method method) { + Class<?>[] types = method.getParameterTypes(); + Class<?>[] new_types = new Class<?>[types.length-1]; + System.arraycopy(types, 0, new_types, 0, new_types.length); + return new_types; + } + + static private Class getResultType(Method method) { + Type[] types = method.getGenericParameterTypes(); + ParameterizedType t = (ParameterizedType) types[types.length-1]; + return (Class) t.getActualTypeArguments()[0]; + } + + + class ServiceResponse { + + private final ClassLoader loader; + private final Method method; + private final DataByteArrayOutputStream responseStream; + private final Runnable onComplete; + private final SerializationStrategy serializationStrategy; + private final int pos; + // Used to protect against sending multiple responses. + final AtomicBoolean responded = new AtomicBoolean(false); + + public ServiceResponse(ClassLoader loader, Method method, DataByteArrayOutputStream responseStream, Runnable onComplete, SerializationStrategy serializationStrategy) { + this.loader = loader; + this.method = method; + this.responseStream = responseStream; + this.onComplete = onComplete; + this.serializationStrategy = serializationStrategy; + pos = responseStream.position(); + } + + public void send(Throwable error, Object value) { + if( responded.compareAndSet(false, true) ) { + Class resultType = getResultType(method); + try { + serializationStrategy.encodeResponse(loader, resultType, value, error, responseStream); + } catch (Exception e) { + // we failed to encode the response.. reposition and write that error. + try { + responseStream.position(pos); + serializationStrategy.encodeResponse(loader, resultType, value, new RemoteException(e.toString()), responseStream); + } catch (Exception unexpected) { + unexpected.printStackTrace(); + } + } finally { + onComplete.run(); + } + } + } + + + } + public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) { + + final ServiceResponse helper = new ServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); + try { + + Object[] new_args = new Object[method.getParameterTypes().length]; + serializationStrategy.decodeRequest(loader, payloadTypes(method), requestStream, new_args); + new_args[new_args.length-1] = new AsyncCallback<Object>() { + public void onSuccess(Object result) { + helper.send(null, result); + } + public void onFailure(Throwable failure) { + helper.send(failure, null); + } + }; + method.invoke(target, new_args); + + } catch (Throwable t) { + helper.send(t, null); + } + + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java new file mode 100644 index 0000000..d695e4f --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java @@ -0,0 +1,126 @@ +/** + * Copyright 2005-2015 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.rmi.RemoteException; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtdispatch.Dispatch; + +/** + * <p> + * </p> + * + */ +public class BlockingInvocationStrategy implements InvocationStrategy { + + public static final BlockingInvocationStrategy INSTANCE = new BlockingInvocationStrategy(); + + private static final Callable<Object> EMPTY_CALLABLE = new Callable<Object>() { + public Object call() { + return null; + } + }; + + private class BlockingResponseFuture extends FutureTask<Object> implements ResponseFuture, AsyncCallback { + + private final ClassLoader loader; + private final Method method; + private final SerializationStrategy serializationStrategy; + + public BlockingResponseFuture(ClassLoader loader, Method method, SerializationStrategy serializationStrategy) { + super(EMPTY_CALLABLE); + this.loader = loader; + this.method = method; + this.serializationStrategy = serializationStrategy; + } + + public void set(DataByteArrayInputStream source) throws IOException, ClassNotFoundException { + try { + serializationStrategy.decodeResponse(loader, method.getReturnType(), source, this); + } catch (Throwable e) { + super.setException(e); + } + } + + public void fail(Throwable failure) { + super.setException(failure); + } + + public void onSuccess(Object result) { + super.set(result); + } + + public void onFailure(Throwable failure) { + super.setException(failure); + } + } + + public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception { + + assert Dispatch.getCurrentQueue() == null : "You should not do blocking RPC class when executing on a dispatch queue"; + + serializationStrategy.encodeRequest(loader, method.getParameterTypes(), args, target); + return new BlockingResponseFuture(loader, method, serializationStrategy); + } + + public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { + + int pos = responseStream.position(); + try { + + Object value = null; + Throwable error = null; + + try { + Class<?>[] types = method.getParameterTypes(); + final Object[] args = new Object[types.length]; + serializationStrategy.decodeRequest(loader, types, requestStream, args); + value = method.invoke(target, args); + } catch (Throwable t) { + if (t instanceof InvocationTargetException) { + error = t.getCause(); + } else { + error = t; + } + } + + serializationStrategy.encodeResponse(loader, method.getReturnType(), value, error, responseStream); + + } catch(Exception e) { + + // we failed to encode the response.. reposition and write that error. + try { + responseStream.position(pos); + serializationStrategy.encodeResponse(loader, method.getReturnType(), null, new RemoteException(e.toString()), responseStream); + } catch (Exception unexpected) { + unexpected.printStackTrace(); + } + + } finally { + onComplete.run(); + } + } + +}
