Author: edwardsmj
Date: Sun Jul 11 10:09:27 2010
New Revision: 963034
URL: http://svn.apache.org/viewvc?rev=963034&view=rev
Log:
Changes and additions to core in support of Client-side and Server-side
asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 &
3612
Added:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
(with props)
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
Sun Jul 11 10:09:27 2010
@@ -23,24 +23,43 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.ComponentReference;
import org.apache.tuscany.sca.assembly.ComponentService;
import org.apache.tuscany.sca.assembly.CompositeReference;
import org.apache.tuscany.sca.assembly.CompositeService;
import org.apache.tuscany.sca.assembly.Contract;
+import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
+import org.apache.tuscany.sca.assembly.builder.BuilderContext;
+import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint;
import org.apache.tuscany.sca.assembly.impl.EndpointImpl;
import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
+import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
+import
org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
+import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor;
import org.apache.tuscany.sca.core.invocation.RuntimeInvoker;
@@ -49,7 +68,10 @@ import org.apache.tuscany.sca.core.invoc
import org.apache.tuscany.sca.interfacedef.Compatibility;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
+import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
@@ -68,6 +90,7 @@ import org.apache.tuscany.sca.runtime.En
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessor;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessorExtensionPoint;
import org.apache.tuscany.sca.work.WorkScheduler;
@@ -101,7 +124,7 @@ public class RuntimeEndpointImpl extends
protected InterfaceContract serviceInterfaceContract;
/**
- * No-arg constructor for Java serilization
+ * No-arg constructor for Java serialization
*/
public RuntimeEndpointImpl() {
super(null);
@@ -218,6 +241,15 @@ public class RuntimeEndpointImpl extends
}
public Message invoke(Message msg) {
+ // Deal with async callback
+ // Ensure invocation chains are built...
+ getInvocationChains();
+ if ( !this.getCallbackEndpointReferences().isEmpty() ) {
+ RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference)
this.getCallbackEndpointReferences().get(0);
+ // Place a link to the callback EPR into the message headers...
+ msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
+ }
+ // end of async callback handling
return invoker.invokeBinding(msg);
}
@@ -288,10 +320,149 @@ public class RuntimeEndpointImpl extends
addServiceBindingInterceptor(chain, operation);
addImplementationInterceptor(serviceComponent, service, chain,
targetOperation);
chains.add(chain);
+
+ // Handle cases where the operation is an async server
+ if( targetOperation.isAsyncServer() ) {
+ createAsyncServerCallback( this, operation );
+ } // end if
}
wireProcessor.process(this);
}
+
+ /**
+ * Creates the async callback for the supplied Endpoint and Operation, if
it does not already exist
+ * and stores it into the Endpoint
+ * @param endpoint - the Endpoint
+ * @param operation - the Operation
+ */
+ private void createAsyncServerCallback( RuntimeEndpoint endpoint,
Operation operation ) {
+ // Check to see if the callback already exists
+ if( asyncCallbackExists( endpoint ) ) return;
+
+ RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint );
+
+ // Store the new callback EPR into the Endpoint
+ endpoint.getCallbackEndpointReferences().add(asyncEPR);
+ } // end method createAsyncServerCallback
+
+ /**
+ * Creates the Endpoint object for the async callback
+ * @param endpoint - the endpoint which has the async server operations
+ * @return the EndpointReference object representing the callback
+ */
+ private RuntimeEndpointReference createAsyncEPR( RuntimeEndpoint endpoint
){
+ CompositeContext compositeContext = endpoint.getCompositeContext();
+ RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory(
compositeContext );
+ RuntimeEndpointReference epr =
(RuntimeEndpointReference)assemblyFactory.createEndpointReference();
+ epr.bind( compositeContext );
+
+ // Create pseudo-reference
+ ComponentReference reference =
assemblyFactory.createComponentReference();
+ ExtensionPointRegistry registry =
compositeContext.getExtensionPointRegistry();
+ FactoryExtensionPoint modelFactories =
registry.getExtensionPoint(FactoryExtensionPoint.class);
+ JavaInterfaceFactory javaInterfaceFactory =
(JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class);
+ JavaInterfaceContract interfaceContract =
javaInterfaceFactory.createJavaInterfaceContract();
+ try {
+
interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class));
+ } catch (InvalidInterfaceException e1) {
+ // Nothing to do here - will not happen
+ } // end try
+ reference.setInterfaceContract(interfaceContract);
+ String referenceName = endpoint.getService().getName() +
"_asyncCallback";
+ reference.setName(referenceName);
+ reference.setForCallback(true);
+ epr.setReference(reference);
+
+ // Create a binding
+ Binding binding = createMatchingBinding( endpoint.getBinding(),
(RuntimeComponent)endpoint.getComponent(), reference, registry );
+ epr.setBinding(binding);
+
+ // Need to establish policies here (binding has some...)
+ epr.getRequiredIntents().addAll( endpoint.getRequiredIntents()
);
+ epr.getPolicySets().addAll( endpoint.getPolicySets() );
+ String eprURI = endpoint.getComponent().getName() +
"#reference-binding(" + referenceName + "/" + referenceName + ")";
+ epr.setURI(eprURI);
+
+ // Attach a dummy endpoint to the epr
+ RuntimeEndpoint ep =
(RuntimeEndpoint)assemblyFactory.createEndpoint();
+ ep.setUnresolved(false);
+ epr.setTargetEndpoint(ep);
+ //epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
+
epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED);
+ epr.setUnresolved(false);
+
+ return epr;
+ } // end method RuntimeEndpointReference
+
+ private boolean asyncCallbackExists( RuntimeEndpoint endpoint ) {
+ if( endpoint.getCallbackEndpointReferences().isEmpty() ) return false;
+ return true;
+ } // end method asyncCallbackExists
+
+ /**
+ * Create a matching binding to a supplied binding
+ * - the matching binding has the same binding type, but is for the
supplied component and service
+ * @param matchBinding - the binding to match
+ * @param component - the component
+ * @param service - the service
+ * @param registry - registry for extensions
+ * @return - the matching binding, or null if it could not be created
+ */
+ @SuppressWarnings("unchecked")
+ private Binding createMatchingBinding( Binding matchBinding,
RuntimeComponent component,
+ ComponentReference
reference, ExtensionPointRegistry registry ) {
+ // Since there is no simple way to obtain a Factory for a binding where
the type is not known ahead of
+ // time, the process followed here is to generate the <binding.xxx/>
XML element from the binding type QName
+ // and then read the XML using the processor for that XML...
+ QName bindingName = matchBinding.getType();
+ String bindingXML = "<ns1:" + bindingName.getLocalPart() + "
xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>";
+
+ StAXArtifactProcessorExtensionPoint processors =
registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
+ StAXArtifactProcessor<?> processor =
(StAXArtifactProcessor<?>)processors.getProcessor(bindingName);
+
+ FactoryExtensionPoint modelFactories =
registry.getExtensionPoint(FactoryExtensionPoint.class);
+ ValidatingXMLInputFactory inputFactory =
modelFactories.getFactory(ValidatingXMLInputFactory.class);
+ StreamSource source = new StreamSource( new StringReader(bindingXML) );
+
+ ProcessorContext context = new ProcessorContext();
+ try {
+ XMLStreamReader reader =
inputFactory.createXMLStreamReader(source);
+ reader.next();
+ Binding newBinding = (Binding) processor.read(reader,
context );
+ newBinding.setName("asyncCallback");
+
+ // Create a URI address for the callback based on the
Component_Name/Reference_Name pattern
+ String callbackURI = "/" + component.getName() + "/" +
reference.getName();
+ //newBinding.setURI(callbackURI);
+
+ BuilderExtensionPoint builders =
registry.getExtensionPoint(BuilderExtensionPoint.class);
+ BindingBuilder builder =
builders.getBindingBuilder(newBinding.getType());
+ if (builder != null) {
+ org.apache.tuscany.sca.assembly.builder.BuilderContext
builderContext = new BuilderContext(registry);
+ builder.build(component, reference, newBinding, builderContext,
true);
+ } // end if
+
+ return newBinding;
+ } catch (ContributionReadException e) {
+ e.printStackTrace();
+ } catch (XMLStreamException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ } // end method createMatchingBinding
+
+ /**
+ * Gets a RuntimeAssemblyFactory from the CompositeContext
+ * @param compositeContext
+ * @return the RuntimeAssemblyFactory
+ */
+ private RuntimeAssemblyFactory getAssemblyFactory( CompositeContext
compositeContext ) {
+ ExtensionPointRegistry registry =
compositeContext.getExtensionPointRegistry();
+ FactoryExtensionPoint modelFactories =
registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return
(RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
+ } // end method RuntimeAssemblyFactory
private void initServiceBindingInvocationChains() {
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
Sun Jul 11 10:09:27 2010
@@ -19,6 +19,9 @@
package org.apache.tuscany.sca.core.invocation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
/**
* A class which is used to wrap an Exception of any type thrown by an
asynchronous service operation and
* which is returned through a separate one-way message sent asynchronously
from the server to the client.
@@ -27,26 +30,73 @@ package org.apache.tuscany.sca.core.invo
public class AsyncFaultWrapper {
private String faultClassName = null;
- private Exception e = null;
+ private String faultMessage = null;
+ private AsyncFaultWrapper containedFault = null;
+
public AsyncFaultWrapper() {
super();
}
- public AsyncFaultWrapper( Exception e ) {
+ /**
+ * Constructor which creates an AsyncFaultWrapper which wraps the
supplied Throwable
+ * @param e - a Throwable which is wrapped by this AsyncFaultWrapper
+ */
+ public AsyncFaultWrapper( Throwable e ) {
super();
storeFault( e );
}
- public void storeFault( Exception e ) {
- faultClassName = e.getClass().getCanonicalName();
- this.e = e;
+ /**
+ * Stores a given Throwable in this AsyncFaultWrapper
+ * If the supplied Throwable itself contains an embedded Throwable
("cause"), this is recursively
+ * wrapped by a nested AsyncFaultWrapper
+ * @param e - the Throwable
+ */
+ public void storeFault( Throwable e ) {
+ setFaultClassName( e.getClass().getCanonicalName() );
+ setFaultMessage( e.getMessage() );
+ Throwable cause = e.getCause();
+ if( cause != null ) setContainedFault( new AsyncFaultWrapper(
cause ) );
}
- public Exception retrieveFault( ) {
- if( e != null ) return e;
- System.out.println( "Tried to retrieve Exception reom
AsyncFaultWrapper: " + faultClassName);
- return null;
- }
+ /**
+ * Retrieves the Throwable wrapped by this AsyncFaultWrapper
+ *
+ * Note: When this method is invoked, the method attempts to
instantiate an instance of the wrapped Throwable.
+ * It does this using the Thread Context Class Loader (TCCL) - the
caller *MUST* ensure that the TCCL has access
+ * to the class of the wrapped Throwable and also to the classes of any
nested Throwables. If this is not done,
+ * a ClassNotFound exception is thrown
+ *
+ * @return - the Throwable wrapped by this AsyncFaultWrapper - the
Throwable will contain any nested Throwable(s)
+ * in its cause property
+ * @throws ClassNotFound exception, if the class of the wrapped
Throwable is not accessible from the TCCL
+ */
+ public Throwable retrieveFault( ) {
+ try {
+ ClassLoader tccl =
Thread.currentThread().getContextClassLoader();
+ Class<?> faultClass = tccl.loadClass(faultClassName);
+ Class<Throwable> xclass = (Class<Throwable>)
faultClass;
+ if( containedFault != null ) {
+ // If there is a nested fault, retrieve this
recursively
+ Constructor cons =
xclass.getConstructor(String.class, Throwable.class);
+ return (Throwable)
cons.newInstance(faultMessage, getContainedFault().retrieveFault());
+ } else {
+ Constructor cons =
xclass.getConstructor(String.class);
+ return (Throwable)
cons.newInstance(faultMessage);
+ } // end if
+ } catch (Exception e) {
+ return e;
+ } // end try
+ } // end method retrieveFault
+
+ public void setFaultClassName( String name ) { this.faultClassName =
name; }
+ public String getFaultClassName() { return this.faultClassName; }
+
+ public String getFaultMessage() { return faultMessage; }
+ public void setFaultMessage(String faultMessage) { this.faultMessage =
faultMessage; }
+
+ public AsyncFaultWrapper getContainedFault() { return containedFault; }
+ public void setContainedFault(AsyncFaultWrapper containedFault) {
this.containedFault = containedFault; }
-}
+} // end class AsyncFaultWrapper
Added:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java?rev=963034&view=auto
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
(added)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
Sun Jul 11 10:09:27 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.core.invocation;
+
+/**
+ * An exception which is used to signal that a service has been invoked
asynchronously
+ * and that the result will be sent separately
+ *
+ */
+public class AsyncResponseException extends RuntimeException {
+
+ private static final long serialVersionUID = 457954562860541631L;
+
+ public AsyncResponseException() {
+ super();
+ }
+
+ public AsyncResponseException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public AsyncResponseException(String arg0) {
+ super(arg0);
+ }
+
+ public AsyncResponseException(Throwable arg0) {
+ super(arg0);
+ }
+
+} // end class AsyncResponseException
Propchange:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
Sun Jul 11 10:09:27 2010
@@ -19,6 +19,7 @@
package org.apache.tuscany.sca.core.invocation;
+import org.oasisopen.sca.annotation.OneWay;
import org.oasisopen.sca.annotation.Remotable;
/**
@@ -34,6 +35,7 @@ public interface AsyncResponseHandler<V>
* @param e - the wrapper containing the Fault to send
* @throws IllegalStateException if either the setResponse method or
the setFault method have been called previously
*/
+ @OneWay
public void setFault(AsyncFaultWrapper e);
/**
@@ -41,6 +43,7 @@ public interface AsyncResponseHandler<V>
* @throws IllegalStateException if either the setResponse method or
the setFault method have been called previously
* @param res - the response message, which is of type V
*/
+ @OneWay
public void setResponse(V res);
}
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
Sun Jul 11 10:09:27 2010
@@ -57,6 +57,8 @@ public class AsyncInvocationFutureImpl<V
private String uniqueID = UUID.randomUUID().toString();
+ private ClassLoader classLoader = null;
+
protected AsyncInvocationFutureImpl() {
super();
} // end constructor
@@ -66,10 +68,13 @@ public class AsyncInvocationFutureImpl<V
* to be set for the class instances
* @param <V> - the type of the response from the asynchronously
invoked service
* @param type - the type of the AsyncInvocationFutureImpl expressed as
a parameter
+ * @param classLoader - the classloader used for the business interface
to which this Future applies
* @return - an instance of AsyncInvocationFutureImpl<V>
*/
- public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V>
type ) {
- return new AsyncInvocationFutureImpl<V>();
+ public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V>
type, ClassLoader classLoader ) {
+ AsyncInvocationFutureImpl<V> future = new
AsyncInvocationFutureImpl<V>();
+ future.setClassLoader( classLoader );
+ return future;
}
/**
@@ -146,8 +151,17 @@ public class AsyncInvocationFutureImpl<V
*/
public void setFault(AsyncFaultWrapper w) {
- Exception e = w.retrieveFault();
- if( e != null ) throw new
IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
+ ClassLoader tccl =
Thread.currentThread().getContextClassLoader();
+ Throwable e;
+ try {
+ // Set the TCCL to the classloader of the business
interface
+
Thread.currentThread().setContextClassLoader(this.getClassLoader());
+ e = w.retrieveFault();
+ } finally {
+ Thread.currentThread().setContextClassLoader(tccl);
+ } // end try
+
+ if( e == null ) throw new
IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
lock.lock();
try {
if( notSetYet() ) {
@@ -201,8 +215,25 @@ public class AsyncInvocationFutureImpl<V
* @return - a Map containing the context
*/
public Map<String, Object> getContext() {
- // TODO Auto-generated method stub
+ // Intentionally returns null
return null;
}
+
+ /**
+ * Gets the classloader associated with the business interface to which
this Future relates
+ * @return the ClassLoader of the business interface
+ */
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ /**
+ * Sets the classloader associated with the business interface to which
this Future relates
+ * @param classLoader - the classloader of the business interface
+ */
+ public void setClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
} // end class AsyncInvocationFutureImpl
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
Sun Jul 11 10:09:27 2010
@@ -21,15 +21,17 @@ package org.apache.tuscany.sca.core.invo
import java.io.StringReader;
import java.lang.reflect.Method;
-import java.lang.reflect.Type;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.xml.namespace.QName;
-import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
@@ -54,29 +56,23 @@ import org.apache.tuscany.sca.contributi
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
-import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
-import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.Intent;
-import org.apache.tuscany.sca.provider.ImplementationProvider;
-import org.apache.tuscany.sca.provider.ImplementationProviderFactory;
import org.apache.tuscany.sca.provider.PolicyProvider;
-import org.apache.tuscany.sca.provider.RuntimeProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.Invocable;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
-import org.oasisopen.sca.ComponentContext;
import org.oasisopen.sca.ServiceReference;
-import org.oasisopen.sca.ServiceRuntimeException;
-import org.oasisopen.sca.annotation.AsyncInvocation;
+import org.oasisopen.sca.ServiceRuntimeException;
/**
* An InvocationHandler which deals with JAXWS-defined asynchronous client
Java API method calls
@@ -96,6 +92,15 @@ import org.oasisopen.sca.annotation.Asyn
public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private static final long serialVersionUID = 1L;
+
+ private static int invocationCount = 10; // # of threads to use
+ private static long maxWaitTime = 30; // Max wait time for
completion = 30sec
+
+ // Run the async service invocations using a ThreadPoolExecutor
+ private static ThreadPoolExecutor theExecutor = new ThreadPoolExecutor(
invocationCount, invocationCount,
+
maxWaitTime, TimeUnit.SECONDS,
+
new ArrayBlockingQueue<Runnable>(
invocationCount ) );
+
public AsyncJDKInvocationHandler(MessageFactory messageFactory,
ServiceReference<?> callableReference) {
super(messageFactory, callableReference);
@@ -119,7 +124,7 @@ public class AsyncJDKInvocationHandler e
return doInvokeAsyncPoll(proxy, method, args);
} else {
// Regular synchronous method call
- return super.invoke(proxy, method, args);
+ return doInvokeSync(proxy, method, args);
}
}
@@ -156,13 +161,11 @@ public class AsyncJDKInvocationHandler e
*/
@SuppressWarnings("unchecked")
protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod,
Object[] args) {
- Object response;
Class<?> returnType = getNonAsyncMethod(asyncMethod).getReturnType();
// Allocate the Future<?> / Response<?> object - note: Response<?> is
a subclass of Future<?>
- AsyncInvocationFutureImpl future =
AsyncInvocationFutureImpl.newInstance( returnType );
+ AsyncInvocationFutureImpl future =
AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
try {
- response = invokeAsync(proxy, getNonAsyncMethod(asyncMethod),
args, future);
- future.setResponse(response);
+ invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future);
} catch (Exception e) {
future.setFault( new AsyncFaultWrapper(e) );
} catch (Throwable t ) {
@@ -171,11 +174,31 @@ public class AsyncJDKInvocationHandler e
future.setFault( new AsyncFaultWrapper(e) );
} // end try
return future;
- //return new AsyncResponse(response, isException);
} // end method doInvokeAsyncPoll
+
+ /**
+ * Provide a synchronous invocation of a service operation that is either
synchronous or asynchronous
+ * @return
+ */
+ protected Object doInvokeSync(Object proxy, Method method, Object[] args)
throws Throwable {
+ if ( isAsyncInvocation( source ) ) {
+ // Target service is asynchronous
+ Class<?> returnType = method.getReturnType();
+ AsyncInvocationFutureImpl future =
AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
+ invokeAsync(proxy, method, args, future);
+ // Wait for some maximum time for the result - 1000 seconds here
+ // Really, if the service is async, the client should use async
client methods to invoke the service
+ // - and be prepared to wait a *really* long time
+ return future.get(1000, TimeUnit.SECONDS);
+ } else {
+ // Target service is not asynchronous, so perform sync
invocation
+ return super.invoke(proxy, method, args);
+ } // end if
+ } // end method doInvokeSync
/**
- * Invoke an async callback method
+ * Invoke an async callback method - note that this form of the async
client API has as its final parameter
+ * an AsyncHandler method, used for callbacks to the client code
* @param proxy - the reference proxy
* @param asyncMethod - the async method to invoke
* @param args - array of input arguments to the method
@@ -186,14 +209,17 @@ public class AsyncJDKInvocationHandler e
private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod,
Object[] args) {
AsyncHandler handler = (AsyncHandler)args[args.length-1];
Response response =
doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
- handler.handleResponse(response);
+ // Invoke the callback handler, if present
+ if( handler != null ) {
+ handler.handleResponse(response);
+ } // end if
return response;
} // end method doInvokeAsyncCallback
/**
- * Invoke the target method on
- * @param proxy
+ * Invoke the target (synchronous) method asynchronously
+ * @param proxy - the reference proxy object
* @param method - the method to invoke
* @param args - arguments for the call
* @param future - Future for handling the response
@@ -201,10 +227,7 @@ public class AsyncJDKInvocationHandler e
* @throws Throwable - if an exception is thrown during the invocation
*/
@SuppressWarnings("unchecked")
- private Object invokeAsync(Object proxy, Method method, Object[] args,
AsyncInvocationFutureImpl future) throws Throwable {
- if (Object.class == method.getDeclaringClass()) {
- return invokeObjectMethod(method, args);
- }
+ private void invokeAsync(Object proxy, Method method, Object[] args,
AsyncInvocationFutureImpl future) throws Throwable {
if (source == null) {
throw new ServiceRuntimeException("No runtime source is
available");
}
@@ -215,7 +238,7 @@ public class AsyncJDKInvocationHandler e
epr.rebuild();
chains.clear();
}
- }
+ } // end if
InvocationChain chain = getInvocationChain(method, source);
@@ -223,22 +246,84 @@ public class AsyncJDKInvocationHandler e
throw new IllegalArgumentException("No matching operation is
found: " + method);
}
+ // Organize for an async service
RuntimeEndpoint theEndpoint = getAsyncCallback( source );
- attachFuture( theEndpoint, future );
+ boolean isAsyncService = false;
+ if( theEndpoint != null ) {
+ // ... the service is asynchronous ...
+ attachFuture( theEndpoint, future );
+ isAsyncService = true;
+ } else {
+ // ... the service is synchronous ...
+ } // end if
- // send the invocation down the source
- Object result = super.invoke(chain, args, source);
+ // Perform the invocations on separate thread...
+ theExecutor.execute( new separateThreadInvoker( chain, args,
source, future, isAsyncService ) );
- return result;
+ return;
} // end method invokeAsync
+ /**
+ * An inner class which acts as a runnable task for invoking services
asynchronously on threads that are separate from
+ * those used to execute operations of components
+ *
+ * This supports both synchronous services and asynchronous services
+ */
+ private class separateThreadInvoker implements Runnable {
+
+ private AsyncInvocationFutureImpl future;
+ private InvocationChain chain;
+ private Object[] args;
+ private Invocable invocable;
+ private boolean isAsyncService;
+
+ public separateThreadInvoker( InvocationChain chain, Object[]
args, Invocable invocable,
+ AsyncInvocationFutureImpl
future, boolean isAsyncService ) {
+ super();
+ this.chain = chain;
+ this.args = args;
+ this.invocable = invocable;
+ this.future = future;
+ this.isAsyncService = isAsyncService;
+ } // end constructor
+
+ public void run() {
+ Object result;
+
+ try {
+ if( isAsyncService ) {
+ invoke(chain, args, invocable,
future.getUniqueID());
+ // The result is returned asynchronously via
the future...
+ } else {
+ // ... the service is synchronous ...
+ result = invoke(chain, args, invocable);
+ future.setResponse(result);
+ } // end if
+ } catch ( ServiceRuntimeException s ) {
+ Throwable e = s.getCause();
+ if( e != null && e instanceof FaultException ) {
+ if(
"AsyncResponse".equals(e.getMessage()) ) {
+ // Do nothing...
+ } else {
+ future.setFault( new
AsyncFaultWrapper( s ) );
+ } // end if
+ } // end if
+ } catch ( Throwable t ) {
+ System.out.println("Async invoke got exception:
" + t.toString());
+ future.setFault( new AsyncFaultWrapper( t ) );
+ } // end try
+
+ } // end method run
+
+ } // end class separateThreadInvoker
+
/**
* Attaches a future to the callback endpoint - so that the Future is
triggered when a response is
* received from the asynchronous service invocation associated with the
Future
* @param endpoint - the async callback endpoint
* @param future - the async invocation future to attach
*/
- private void attachFuture( RuntimeEndpoint endpoint,
AsyncInvocationFutureImpl future ) {
+ private void attachFuture( RuntimeEndpoint endpoint,
AsyncInvocationFutureImpl<?> future ) {
Implementation impl = endpoint.getComponent().getImplementation();
AsyncResponseHandlerImpl<?> asyncHandler =
(AsyncResponseHandlerImpl<?>) impl;
asyncHandler.addFuture(future);
@@ -257,11 +342,12 @@ public class AsyncJDKInvocationHandler e
RuntimeEndpoint endpoint;
synchronized( epr ) {
endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint();
+ // If the async callback endpoint is already created, return
it...
if( endpoint != null ) return endpoint;
// Create the endpoint for the async callback
endpoint = createAsyncCallbackEndpoint( epr );
epr.setCallbackEndpoint(endpoint);
- }
+ } // end synchronized
// Activate the new callback endpoint
startEndpoint( epr.getCompositeContext(), endpoint );
@@ -335,19 +421,9 @@ public class AsyncJDKInvocationHandler e
services.clear();
services.add(service);
- Binding eprBinding = epr.getBinding();
- try {
- Binding binding = (Binding)eprBinding.clone();
- // Create a binding
- binding = createMatchingBinding( eprBinding,
fakeComponent, service, registry );
-
- // Create a URI address for the callback based on the
Component_Name/Reference_Name pattern
- //String callbackURI = "/" +
epr.getComponent().getName() + "/" + serviceName;
- //binding.setURI(callbackURI);
- endpoint.setBinding(binding);
- } catch (CloneNotSupportedException e) {
- // will not happen
- } // end try
+ // Create a binding
+ Binding binding = createMatchingBinding( epr.getBinding(),
fakeComponent, service, registry );
+ endpoint.setBinding(binding);
// Need to establish policies here (binding has some...)
endpoint.getRequiredIntents().addAll( epr.getRequiredIntents()
);
@@ -421,6 +497,11 @@ public class AsyncJDKInvocationHandler e
return
(RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
} // end method RuntimeAssemblyFactory
+ /**
+ * Applies an AsyncResponseHandlerImpl as the implementation of a
RuntimeComponent
+ * - the AsyncResponseHandlerImpl acts as both the implementation class
and the implementation provider...
+ * @param component - the component
+ */
private void applyImplementation( RuntimeComponent component ) {
AsyncResponseHandlerImpl<?> asyncHandler = new
AsyncResponseHandlerImpl<Object>();
component.setImplementation( asyncHandler );
@@ -434,7 +515,8 @@ public class AsyncJDKInvocationHandler e
* @param source - the EPR involved in the invocation
* @return - true if the invocation is async
*/
- private boolean isAsyncInvocation( RuntimeEndpointReference source ) {
+ private boolean isAsyncInvocation( Invocable source ) {
+ if( !(source instanceof RuntimeEndpointReference) ) return false;
RuntimeEndpointReference epr = (RuntimeEndpointReference)
source;
// First check is to see if the EPR itself has the
asyncInvocation intent marked
for( Intent intent : epr.getRequiredIntents() ) {
@@ -462,5 +544,13 @@ public class AsyncJDKInvocationHandler e
}
}
throw new IllegalStateException("No synchronous method matching async
method " + asyncMethod.getName());
+ } // end method getNonAsyncMethod
+
+ /**
+ * Gets the classloader of the business interface
+ * @return
+ */
+ private ClassLoader getInterfaceClassloader( ) {
+ return businessInterface.getClassLoader();
}
}
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
Sun Jul 11 10:09:27 2010
@@ -42,6 +42,7 @@ import org.apache.tuscany.sca.runtime.Ru
/**
* A class intended to form the final link in the chain calling into a Future
which represents
* the response to an asynchronous service invocation
+ *
* Most methods are dummies, required to fulfil the contracts for
ImplementationProvider, Implementation
* and Invoker, since this class collapses together the functions of these
separate interfaces, due to its
* specialized nature, where most of the function will never be used.
@@ -52,7 +53,7 @@ import org.apache.tuscany.sca.runtime.Ru
* message header. On receipt of each message, the class seeks out the Future
with that unique ID and completes the future
* either with a response message or with a Fault.
*
- * @param <V>
+ * @param <V>
*/
public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
ImplementationProvider, Implementation, Invoker {
@@ -60,6 +61,9 @@ public class AsyncResponseHandlerImpl<V>
private ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> > table
=
new ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> >();
+ /**
+ * This class is its own invoker...
+ */
public Invoker createInvoker(RuntimeComponentService service,
Operation operation) {
return this;
@@ -144,13 +148,46 @@ public class AsyncResponseHandlerImpl<V>
public void setResponse(V res) { }
- public Message invoke(Message msg) {
- // TODO Auto-generated method stub
+ /**
+ * Method which is the termination for the invocation chain from the
callback endpoint
+ * @param msg - the Tuscany message containing the response from the
async service invocation
+ * which is either the Response message or an exception of some kind
+ */
+ private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID";
+ public Message invoke(Message msg) {
// Get the unique ID from the message header
- // Fetch the Future with that Unique ID
- // Complete the Future with a Response message
- // ...or complete the Future with a Fault
- return null;
- }
+ String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
+ if( idValue == null ) {
+ System.out.println( "Async message ID not found ");
+ } else {
+ // Fetch the Future with that Unique ID
+ AsyncInvocationFutureImpl future = table.get(idValue);
+ if( future == null ) {
+ System.out.println("Future not found for id: "
+ idValue);
+ } else {
+ // Complete the Future with a Response message
+ Object payload = msg.getBody();
+ Object response;
+ if( payload == null ) {
+ System.out.println("Returned response
message was null");
+ } else {
+ if (payload.getClass().isArray()) {
+ response = ((Object[])payload)[0];
+ } else {
+ response = payload;
+ } // end if
+ if(
response.getClass().equals(AsyncFaultWrapper.class)) {
+ future.setFault((AsyncFaultWrapper) response );
+ } else {
+ future.setResponse(response);
+ } // end if
+ } // end if
+ } // end if
+ } // end if
+
+ // Prepare an empty response message
+ msg.setBody(null);
+ return msg;
+ } // end method invoke
} // end class
Modified:
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
URL:
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java?rev=963034&r1=963033&r2=963034&view=diff
==============================================================================
---
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
(original)
+++
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
Sun Jul 11 10:09:27 2010
@@ -225,8 +225,22 @@ public class JDKInvocationHandler implem
protected void setEndpoint(Endpoint endpoint) {
this.target = endpoint;
}
-
+
protected Object invoke(InvocationChain chain, Object[] args, Invocable
source)
+ throws Throwable {
+ return invoke( chain, args, source, null );
+ }
+
+ /**
+ * Invoke the chain
+ * @param chain - the chain
+ * @param args - arguments to the invocation as an array of Objects
+ * @param source - the Endpoint or EndpointReference to which the chain
relates
+ * @param msgID - an ID for the message being sent, may be null
+ * @return - the Response message from the invocation
+ * @throws Throwable - if any exception occurs during the invocation
+ */
+ protected Object invoke(InvocationChain chain, Object[] args, Invocable
source, String msgID)
throws Throwable {
Message msg = messageFactory.createMessage();
if (source instanceof RuntimeEndpointReference) {
@@ -250,6 +264,11 @@ public class JDKInvocationHandler implem
transferMessageHeaders( msg, msgContext);
ThreadMessageContext.setMessageContext(msg);
+
+ // If there is a supplied message ID, place its value into the Message
Header under "MESSAGE_ID"
+ if( msgID != null ){
+ msg.getHeaders().put("MESSAGE_ID", msgID);
+ } // end if
try {
// dispatch the source down the chain and get the response