Author: edwardsmj
Date: Mon Jul  5 11:15:25 2010
New Revision: 960545

URL: http://svn.apache.org/viewvc?rev=960545&view=rev
Log:
Initial version of async client endpoint for async response messages for Phase 
II of TUSCANY-3612

Modified:
    
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
    
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java

Modified: 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
URL: 
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java?rev=960545&r1=960544&r2=960545&view=diff
==============================================================================
--- 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
 (original)
+++ 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
 Mon Jul  5 11:15:25 2010
@@ -20,6 +20,7 @@
 package org.apache.tuscany.sca.core.invocation.impl;
 
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,9 @@ import java.util.concurrent.locks.Reentr
 
 import javax.xml.ws.Response;
 
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+
 /**
  * A class which provides an Implementation of a Future<V> and Response<V> for 
use with the JAXWS defined client
  * asynchronous APIs.
@@ -41,7 +45,7 @@ import javax.xml.ws.Response;
  *
  * @param <V> - this is the type of the response message from the invoked 
service.
  */
-public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V> {
+public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, 
AsyncResponseHandler<V> {
        
        // Lock for handling the completion of this Future
        private final Lock lock = new ReentrantLock();
@@ -51,6 +55,8 @@ public class AsyncInvocationFutureImpl<V
        private volatile V response = null;
        private volatile Throwable fault = null; 
        
+       private String uniqueID = UUID.randomUUID().toString();
+       
        protected AsyncInvocationFutureImpl() {
                super();
        } // end constructor
@@ -138,8 +144,10 @@ public class AsyncInvocationFutureImpl<V
         * @param e - the Fault to send
         * @throws IllegalStateException if either the setResponse method or 
the setFault method have been called previously
         */
-       public void setFault(Throwable e) {
+       public void setFault(AsyncFaultWrapper w) {
 
+               Exception e = w.retrieveFault();
+               if( e != null ) throw new 
IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
                lock.lock();
                try {
                        if( notSetYet() ) {
@@ -176,13 +184,18 @@ public class AsyncInvocationFutureImpl<V
        } // end method setResponse
        
        /**
+        * Gets the unique ID of this future as a String
+        */
+       public String getUniqueID() { return uniqueID; }
+
+       /**
         * Indicates that setting a response value is OK - can only set the 
response value or fault once
         * @return - true if it is OK to set the response, false otherwise
         */
        private boolean notSetYet() {
                return ( response == null && fault == null );
        }
-
+       
        /**
         * Returns the JAXWS context for the response
         * @return - a Map containing the context

Modified: 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
URL: 
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java?rev=960545&r1=960544&r2=960545&view=diff
==============================================================================
--- 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
 (original)
+++ 
tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
 Mon Jul  5 11:15:25 2010
@@ -19,17 +19,64 @@
 
 package org.apache.tuscany.sca.core.invocation.impl;
 
+import java.io.StringReader;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.Future;
 
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
 import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Response;
 
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.assembly.ComponentService;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.Implementation;
+import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
+import org.apache.tuscany.sca.assembly.builder.BuilderContext;
+import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint;
+import org.apache.tuscany.sca.assembly.xml.Constants;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
+import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
+import 
org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
+import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
+import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl;
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
+import org.apache.tuscany.sca.invocation.InvocationChain;
 import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.policy.Intent;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
+import org.apache.tuscany.sca.provider.ImplementationProviderFactory;
+import org.apache.tuscany.sca.provider.PolicyProvider;
+import org.apache.tuscany.sca.provider.RuntimeProvider;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
 import org.apache.tuscany.sca.runtime.Invocable;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
+import org.oasisopen.sca.ComponentContext;
 import org.oasisopen.sca.ServiceReference;
+import org.oasisopen.sca.ServiceRuntimeException;
+import org.oasisopen.sca.annotation.AsyncInvocation;
 
 /**
  * An InvocationHandler which deals with JAXWS-defined asynchronous client 
Java API method calls
@@ -107,24 +154,25 @@ public class AsyncJDKInvocationHandler e
      * @return - the Response<?> object that is returned to the client 
application, typed by the 
      *           type of the response
      */
-    protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, 
Object[] args) {
+    @SuppressWarnings("unchecked")
+       protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, 
Object[] args) {
         Object response;
-        boolean isException;
         Class<?> returnType = getNonAsyncMethod(asyncMethod).getReturnType();
         // Allocate the Future<?> / Response<?> object - note: Response<?> is 
a subclass of Future<?>
         AsyncInvocationFutureImpl future = 
AsyncInvocationFutureImpl.newInstance( returnType );
         try {
-            response = super.invoke(proxy, getNonAsyncMethod(asyncMethod), 
args);
-            isException = false;
+            response = invokeAsync(proxy, getNonAsyncMethod(asyncMethod), 
args, future);
             future.setResponse(response);
-        } catch (Throwable e) {
-            response = e;
-            isException = true;
-            future.setFault(e);
-        }
+        } catch (Exception e) {
+            future.setFault( new AsyncFaultWrapper(e) );
+        } catch (Throwable t ) {
+               Exception e = new ServiceRuntimeException("Received Throwable: 
" + t.getClass().getName() + 
+                                                                 " when 
invoking: " + asyncMethod.getName(), t);
+               future.setFault( new AsyncFaultWrapper(e) );
+        } // end try 
         return future;
         //return new AsyncResponse(response, isException);
-    }
+    } // end method doInvokeAsyncPoll
 
     /**
      * Invoke an async callback method
@@ -134,14 +182,273 @@ public class AsyncJDKInvocationHandler e
      * @return - the Future<?> object that is returned to the client 
application, typed by the type of
      *           the response
      */
-    private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, 
Object[] args) {
+    @SuppressWarnings("unchecked")
+       private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, 
Object[] args) {
         AsyncHandler handler = (AsyncHandler)args[args.length-1];
         Response response = 
doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
         handler.handleResponse(response);
         
         return response;
-    }
+    } // end method doInvokeAsyncCallback
+
+    /**
+     * Invoke the target method on 
+     * @param proxy
+     * @param method - the method to invoke
+     * @param args - arguments for the call
+     * @param future - Future for handling the response
+     * @return - returns the response from the invocation
+     * @throws Throwable - if an exception is thrown during the invocation
+     */
+    @SuppressWarnings("unchecked")
+       private Object invokeAsync(Object proxy, Method method, Object[] args, 
AsyncInvocationFutureImpl future) throws Throwable {
+        if (Object.class == method.getDeclaringClass()) {
+            return invokeObjectMethod(method, args);
+        }
+        if (source == null) {
+            throw new ServiceRuntimeException("No runtime source is 
available");
+        }
+        
+        if (source instanceof RuntimeEndpointReference) {
+            RuntimeEndpointReference epr = (RuntimeEndpointReference)source;
+            if (epr.isOutOfDate()) {
+                epr.rebuild();
+                chains.clear();
+            }
+        }
+        
+        InvocationChain chain = getInvocationChain(method, source);
+        
+        if (chain == null) {
+            throw new IllegalArgumentException("No matching operation is 
found: " + method);
+        }
+        
+        RuntimeEndpoint theEndpoint = getAsyncCallback( source );
+        attachFuture( theEndpoint, future );
+        
+        // send the invocation down the source
+        Object result = super.invoke(chain, args, source);
 
+        return result;
+    } // end method invokeAsync
+    
+    /**
+     * Attaches a future to the callback endpoint - so that the Future is 
triggered when a response is
+     * received from the asynchronous service invocation associated with the 
Future
+     * @param endpoint - the async callback endpoint
+     * @param future - the async invocation future to attach
+     */
+    private void attachFuture( RuntimeEndpoint endpoint, 
AsyncInvocationFutureImpl future ) {
+       Implementation impl = endpoint.getComponent().getImplementation();
+       AsyncResponseHandlerImpl<?> asyncHandler = 
(AsyncResponseHandlerImpl<?>) impl;
+       asyncHandler.addFuture(future);
+    } // end method attachFuture
+    
+    /**
+     * Get the async callback endpoint - if not already created, create and 
start it
+     * @param source - the RuntimeEndpointReference which needs an async 
callback endpoint
+     * @param future 
+     * @return - the RuntimeEndpoint of the async callback
+     */
+    private RuntimeEndpoint getAsyncCallback( Invocable source ) {
+       if( !(source instanceof RuntimeEndpointReference) ) return null;
+               RuntimeEndpointReference epr = (RuntimeEndpointReference) 
source;
+       if( !isAsyncInvocation( epr ) ) return null;
+       RuntimeEndpoint endpoint;
+       synchronized( epr ) {
+               endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint();
+               if( endpoint != null ) return endpoint;
+               // Create the endpoint for the async callback
+               endpoint = createAsyncCallbackEndpoint( epr );
+               epr.setCallbackEndpoint(endpoint);
+       }
+       
+       // Activate the new callback endpoint
+       startEndpoint( epr.getCompositeContext(), endpoint );
+       endpoint.getInvocationChains();
+       
+       return endpoint;
+    } // end method setupAsyncCallback
+    
+    /**
+     * Start the callback endpoint
+     * @param compositeContext - the composite context
+     * @param ep - the endpoint to start
+     */
+    private void startEndpoint(CompositeContext compositeContext, 
RuntimeEndpoint ep ) {
+        for (PolicyProvider policyProvider : ep.getPolicyProviders()) {
+            policyProvider.start();
+        } // end for
+
+        final ServiceBindingProvider bindingProvider = ep.getBindingProvider();
+        if (bindingProvider != null) {
+            // Allow bindings to add shutdown hooks. Requires 
RuntimePermission shutdownHooks in policy.
+            AccessController.doPrivileged(new PrivilegedAction<Object>() {
+                public Object run() {
+                    bindingProvider.start();
+                    return null;
+                  }
+            });
+            compositeContext.getEndpointRegistry().addEndpoint(ep);
+        }
+    } // end method startEndpoint
+    
+    /**
+     * Create the async callback endpoint for a reference that is going to 
invoke an asyncInvocation service
+     * @param epr - the RuntimeEndpointReference for which the callback is 
created
+     * @return - a RuntimeEndpoint representing the callback endpoint
+     */
+    private RuntimeEndpoint createAsyncCallbackEndpoint( 
RuntimeEndpointReference epr ) {
+       CompositeContext compositeContext = epr.getCompositeContext();
+       RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory( 
compositeContext );
+        RuntimeEndpoint endpoint = 
(RuntimeEndpoint)assemblyFactory.createEndpoint();
+        endpoint.bind( compositeContext );
+        
+        // Create a pseudo-component and pseudo-service 
+        // - need to end with a chain with an invoker into the 
AsyncCallbackHandler class
+        RuntimeComponent fakeComponent = null;
+        try {
+                       fakeComponent = 
(RuntimeComponent)epr.getComponent().clone();
+                       applyImplementation( fakeComponent );
+               } catch (CloneNotSupportedException e2) {
+                       // will not happen
+               } // end try
+        endpoint.setComponent(fakeComponent);
+        
+        // Create pseudo-service
+        ComponentService service = assemblyFactory.createComponentService();
+       ExtensionPointRegistry registry = 
compositeContext.getExtensionPointRegistry();
+        FactoryExtensionPoint modelFactories = 
registry.getExtensionPoint(FactoryExtensionPoint.class);
+        JavaInterfaceFactory javaInterfaceFactory = 
(JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class);
+        JavaInterfaceContract interfaceContract = 
javaInterfaceFactory.createJavaInterfaceContract();
+        try {
+                       
interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class));
+               } catch (InvalidInterfaceException e1) {
+                       // Nothing to do here - will not happen
+               } // end try
+        service.setInterfaceContract(interfaceContract);
+        String serviceName = epr.getReference().getName() + "_asyncCallback";
+        service.setName(serviceName);
+        endpoint.setService(service);
+        // Set pseudo-service onto the pseudo-component
+        List<ComponentService> services = fakeComponent.getServices();
+        services.clear();
+        services.add(service);
+        
+        Binding eprBinding = epr.getBinding();
+        try {
+                       Binding binding = (Binding)eprBinding.clone();
+                       // Create a binding
+                       binding = createMatchingBinding( eprBinding, 
fakeComponent, service, registry );
+                                       
+                       // Create a URI address for the callback based on the 
Component_Name/Reference_Name pattern
+                       //String callbackURI = "/" + 
epr.getComponent().getName() + "/" + serviceName;
+                       //binding.setURI(callbackURI);
+                       endpoint.setBinding(binding);
+               } catch (CloneNotSupportedException e) {
+                       // will not happen
+               } // end try
+               
+               // Need to establish policies here (binding has some...)
+               endpoint.getRequiredIntents().addAll( epr.getRequiredIntents() 
);
+               endpoint.getPolicySets().addAll( epr.getPolicySets() );
+               String epURI = epr.getComponent().getName() + 
"#service-binding(" + serviceName + "/" + serviceName + ")";
+               endpoint.setURI(epURI);
+        endpoint.setUnresolved(false);
+       return endpoint;
+    }
+    
+    /**
+     * Create a matching binding to a supplied binding
+     * - the matching binding has the same binding type, but is for the 
supplied component and service
+     * @param matchBinding - the binding to match
+     * @param component - the component 
+     * @param service - the service
+     * @param registry - registry for extensions
+     * @return - the matching binding, or null if it could not be created
+     */
+    @SuppressWarnings("unchecked")
+       private Binding createMatchingBinding( Binding matchBinding, 
RuntimeComponent component, 
+                                                      ComponentService 
service, ExtensionPointRegistry registry ) {
+       // Since there is no simple way to obtain a Factory for a binding where 
the type is not known ahead of
+       // time, the process followed here is to generate the <binding.xxx/> 
XML element from the binding type QName
+       // and then read the XML using the processor for that XML...
+       QName bindingName = matchBinding.getType();
+       String bindingXML = "<ns1:" + bindingName.getLocalPart() + " 
xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>";
+       
+       StAXArtifactProcessorExtensionPoint processors = 
registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
+       StAXArtifactProcessor<?> processor = 
(StAXArtifactProcessor<?>)processors.getProcessor(bindingName);
+       
+       FactoryExtensionPoint modelFactories = 
registry.getExtensionPoint(FactoryExtensionPoint.class);
+       ValidatingXMLInputFactory inputFactory = 
modelFactories.getFactory(ValidatingXMLInputFactory.class);                    
+       StreamSource source = new StreamSource( new StringReader(bindingXML) );
+       
+       ProcessorContext context = new ProcessorContext();
+               try {
+                       XMLStreamReader reader = 
inputFactory.createXMLStreamReader(source);
+                       reader.next();
+                       Binding newBinding = (Binding) processor.read(reader, 
context );
+                       
+                       // Create a URI address for the callback based on the 
Component_Name/Reference_Name pattern
+                       String callbackURI = "/" + component.getName() + "/" + 
service.getName();
+                       newBinding.setURI(callbackURI);
+                       
+                       BuilderExtensionPoint builders = 
registry.getExtensionPoint(BuilderExtensionPoint.class);
+                       BindingBuilder builder = 
builders.getBindingBuilder(newBinding.getType());
+            if (builder != null) {
+               org.apache.tuscany.sca.assembly.builder.BuilderContext 
builderContext = new BuilderContext(registry);
+               builder.build(component, service, newBinding, builderContext, 
true);
+            } // end if
+                       
+                       return newBinding;
+               } catch (ContributionReadException e) {
+                       e.printStackTrace();
+               } catch (XMLStreamException e) {
+                       e.printStackTrace();
+               }
+       
+       return null;
+    } // end method createMatchingBinding
+    
+    /**
+     * Gets a RuntimeAssemblyFactory from the CompositeContext
+     * @param compositeContext
+     * @return the RuntimeAssemblyFactory
+     */
+    private RuntimeAssemblyFactory getAssemblyFactory( CompositeContext 
compositeContext ) {
+       ExtensionPointRegistry registry = 
compositeContext.getExtensionPointRegistry();
+        FactoryExtensionPoint modelFactories = 
registry.getExtensionPoint(FactoryExtensionPoint.class);
+        return 
(RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
+    } // end method RuntimeAssemblyFactory
+    
+    private void applyImplementation( RuntimeComponent component ) {
+       AsyncResponseHandlerImpl<?> asyncHandler = new 
AsyncResponseHandlerImpl<Object>();
+       component.setImplementation( asyncHandler );
+       component.setImplementationProvider( asyncHandler );
+        return;
+    } // end method getImplementationProvider
+    
+    private static QName ASYNC_INVOKE = new QName( Constants.SCA11_NS, 
"asyncInvocation" );
+    /**
+     * Determines if the service invocation is asynchronous
+     * @param source - the EPR involved in the invocation
+     * @return - true if the invocation is async
+     */
+    private boolean isAsyncInvocation( RuntimeEndpointReference source ) {
+               RuntimeEndpointReference epr = (RuntimeEndpointReference) 
source;
+               // First check is to see if the EPR itself has the 
asyncInvocation intent marked
+               for( Intent intent : epr.getRequiredIntents() ) {
+                       if ( intent.getName().equals(ASYNC_INVOKE) ) return 
true;
+               } // end for
+               
+               // Second check is to see if the target service has the 
asyncInvocation intent marked
+               Endpoint ep = epr.getTargetEndpoint();
+               for( Intent intent : ep.getRequiredIntents() ) {
+                       if ( intent.getName().equals(ASYNC_INVOKE) ) return 
true;
+               } // end for
+       return false;
+    } // end isAsyncInvocation
+    
     /**
      * Return the synchronous method that is the equivalent of an async method
      * @param asyncMethod - the async method


Reply via email to