Author: edwardsmj
Date: Sun Jul 11 10:10:42 2010
New Revision: 963037

URL: http://svn.apache.org/viewvc?rev=963037&view=rev
Log:
Changes and additions to Java invoker in support of Client-side and Server-side 
asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 
3612

Modified:
    
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
    
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java

Modified: 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
URL: 
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
 (original)
+++ 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
 Sun Jul 11 10:10:42 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.tuscany.sca.core.factory.InstanceWrapper;
 import org.apache.tuscany.sca.core.factory.ObjectCreationException;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
 import org.apache.tuscany.sca.interfacedef.DataType;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
@@ -70,7 +71,7 @@ public class JavaAsyncImplementationInvo
             // For an async server method, there is an extra input parameter, 
which is a DispatchResponse instance 
             // which is typed by the type of the response
             Class<?> responseType = op.getOutputType().getPhysical();
-            ResponseDispatch<?> dispatch = 
ResponseDispatchImpl.newInstance(responseType);
+            ResponseDispatch<?> dispatch = 
ResponseDispatchImpl.newInstance(responseType, msg );
             
             Object ret;
             Object[] payload2;
@@ -87,15 +88,12 @@ public class JavaAsyncImplementationInvo
             
             ret = method.invoke(instance, (Object[])payload2);
             
-            try {
-               ret = ((ResponseDispatchImpl<?>)dispatch).get(50, 
TimeUnit.SECONDS);
-            } catch (Throwable t) {
-               throw new InvocationTargetException(t);
-            } // end try
-            
-            scopeContainer.returnWrapper(wrapper, contextId);
+            //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, 
TimeUnit.SECONDS);
+            throw new InvocationTargetException( new 
AsyncResponseException("AsyncResponse") );
+
+            //scopeContainer.returnWrapper(wrapper, contextId);
             
-            msg.setBody(ret);
+            //msg.setBody(ret);
         } catch (InvocationTargetException e) {
             Throwable cause = e.getTargetException();
             boolean isChecked = false;

Modified: 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
URL: 
http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
 (original)
+++ 
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
 Sun Jul 11 10:10:42 2010
@@ -20,6 +20,8 @@
 package org.apache.tuscany.sca.implementation.java.invocation;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,7 +29,22 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.factory.ObjectFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
+import org.apache.tuscany.sca.core.invocation.ProxyFactory;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
 import org.oasisopen.sca.ResponseDispatch;
+import org.oasisopen.sca.ServiceReference;
 
 /**
  * Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -45,6 +62,7 @@ public class ResponseDispatchImpl<T> imp
         * Generated serialVersionUID value
         */
        private static final long serialVersionUID = 300158355992568592L;
+    private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
        
        // A latch used to ensure that the sendResponse() and sendFault() 
operations are used at most once
        // The latch is initialized with the value "false"
@@ -57,12 +75,24 @@ public class ResponseDispatchImpl<T> imp
        private volatile T response = null;
        private volatile Throwable fault = null; 
        
-       public ResponseDispatchImpl( ) {
+       private ExtensionPointRegistry registry;
+       
+       // Service Reference used for the callback
+       private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+       private String callbackAddress;
+       private String messageID;
+       
+       public ResponseDispatchImpl( Message msg ) {
                super();
+               callbackRef = getAsyncCallbackRef( msg );
+       
+               callbackAddress = msg.getFrom().getCallbackEndpoint().getURI();
+       messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+       
        } // end constructor
        
-       public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
-               return new ResponseDispatchImpl<T>();
+       public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, 
Message msg ) {
+               return new ResponseDispatchImpl<T>( msg );
        }
        
        /**
@@ -89,6 +119,10 @@ public class ResponseDispatchImpl<T> imp
                } else {
                        throw new IllegalStateException("sendResponse() or 
sendFault() has been called previously");
                } // end if
+               // Now dispatch the response to the callback...
+               AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) 
callbackRef.getService();
+               setResponseHeaders();
+               handler.setFault(new AsyncFaultWrapper(e));
        } // end method sendFault
 
        /**
@@ -108,6 +142,10 @@ public class ResponseDispatchImpl<T> imp
                } else {
                        throw new IllegalStateException("sendResponse() or 
sendFault() has been called previously");
                } // end if
+               // Now dispatch the response to the callback...
+               AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) 
callbackRef.getService();
+               setResponseHeaders();
+               handler.setResponse(res);
        } // end method sendResponse
        
        public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -133,4 +171,47 @@ public class ResponseDispatchImpl<T> imp
        private boolean sendOK() {
                return latch.compareAndSet(false, true);
        }
+       
+       /**
+        * Creates a service reference for the async callback, based on 
information contained in the supplied message
+        * @param msg - the incoming message
+        * @return - a CallBackServiceReference
+        */
+       @SuppressWarnings("unchecked")
+       private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( 
Message msg ) { 
+       RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) 
msg.getHeaders().get("ASYNC_CALLBACK");
+       if( callbackEPR == null ) return null;
+       
+       CompositeContext compositeContext = callbackEPR.getCompositeContext();
+        registry = compositeContext.getExtensionPointRegistry();
+       ProxyFactory proxyFactory = 
ExtensibleProxyFactory.getInstance(registry);
+       List<EndpointReference> eprList = new ArrayList<EndpointReference>();
+       eprList.add(callbackEPR);
+       ObjectFactory<?> factory = new 
CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, 
eprList);
+       
+       return (ServiceReference<AsyncResponseHandler<?>>) 
factory.getInstance();
+       
+    } // end method getAsyncCallbackEPR
+       
+       /**
+        * Sets the values of various headers in the response message
+        */
+       private void setResponseHeaders() {
+               // Is there an existing message context?
+               Message msgContext = ThreadMessageContext.getMessageContext();
+               if( msgContext == null ) {
+                       // Create a message context
+                       msgContext = getMessageFactory().createMessage();
+               } // end if
+               
+               // Add in the header for the RelatesTo Message ID
+               msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
+               
+               ThreadMessageContext.setMessageContext(msgContext);
+       } // end method setResponseHeaders
+       
+       private MessageFactory getMessageFactory() {
+        FactoryExtensionPoint modelFactories = 
registry.getExtensionPoint(FactoryExtensionPoint.class);
+        return modelFactories.getFactory(MessageFactory.class);
+       } // end method getMessageFactory
 }


Reply via email to