Author: rr
Date: Wed Aug 19 23:45:13 2009
New Revision: 806009

URL: http://svn.apache.org/viewvc?rev=806009&view=rev
Log:
ODE-654: Reply for MyRole MEX shoudn't deliver responses from rolled back jobs 
fix

Modified:
    
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java

Modified: 
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?rev=806009&r1=806008&r2=806009&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
 Wed Aug 19 23:45:13 2009
@@ -24,6 +24,7 @@
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.jbi.msgmap.Mapper;
 import org.apache.ode.jbi.msgmap.MessageTranslationException;
@@ -181,13 +182,16 @@
      *            message exchange
      */
     public void onResponse(MyRoleMessageExchange mex) {
-        __log.debug("Consuming MEX tracker " + mex.getClientId());
-        javax.jbi.messaging.MessageExchange jbiMex = 
_jbiMexTracker.consume(mex.getClientId());
+        final String clientId = mex.getClientId();
+        final String mexId = mex.getMessageExchangeId();
+        __log.debug("Processing MEX tracker mexId: " + mexId + " clientId: " + 
clientId);
+        final javax.jbi.messaging.MessageExchange jbiMex = 
_jbiMexTracker.peek(clientId);
         if (jbiMex == null) {
-            __log.warn("Ignoring unknown async reply: " + mex);
+            __log.warn("Ignoring unknown async reply. mexId: " + mexId + " 
clientId: " + clientId);
             return;
         }
 
+        try {
         switch (mex.getStatus()) {
         case FAULT:
             outResponseFault(mex, jbiMex);
@@ -199,9 +203,30 @@
             outFailure(mex, jbiMex);
             break;
         default:
-            __log.warn("Received ODE message exchange in unexpected state: " + 
mex.getStatus());
+            __log.warn("Received ODE message exchange in unexpected state: " + 
mex.getStatus() + " mexId: " + mexId + " clientId: " + clientId);
         }
+        
         mex.release(mex.getStatus() == MessageExchange.Status.RESPONSE);
+        _ode._scheduler.registerSynchronizer(new Synchronizer() {
+            public void afterCompletion(boolean success) {
+                if (success) {
+                    //Deliver reply to external world only if ODE scheduler's 
job has completed successfully
+                    try {
+                        _ode.getChannel().send(jbiMex);
+                    __log.debug("Consuming MEX tracker mexId: " + mexId + " 
clientId: " + clientId);
+                    _jbiMexTracker.consume(clientId);
+                    } catch (MessagingException e) {
+                        __log.error("Error delivering response from ODE to JBI 
mexId: " + mexId + " clientId: " + clientId, e);
+                    }
+                }
+            }
+
+            public void beforeCompletion() {
+            }
+        });
+        } catch (MessagingException e) {
+            __log.error("Error processing response from ODE to JBI mexId: " + 
mexId + " clientId: " + clientId, e);
+        }
     }
 
     /**
@@ -255,7 +280,7 @@
                 }
             } else {
                 __log.error("ODE MEX " + odeMex + " was unroutable.");
-                sendError(jbiMex, new IllegalArgumentException("Unroutable 
invocation."));
+                setError(jbiMex, new IllegalArgumentException("Unroutable 
invocation."));
             }
 
             success = true;
@@ -279,18 +304,13 @@
 
     }
 
-    private void outFailure(MyRoleMessageExchange odeMex, 
javax.jbi.messaging.MessageExchange jbiMex) {
-        try {
-            jbiMex.setError(new Exception("MEXFailure"));
-            jbiMex.setStatus(ExchangeStatus.ERROR);
-            // TODO: get failure codes out of the message.
-            _ode.getChannel().send(jbiMex);
-        } catch (MessagingException ex) {
-            __log.fatal("Error bridging ODE out response: ", ex);
-        }
+    private void outFailure(MyRoleMessageExchange odeMex, 
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
+        jbiMex.setError(new Exception("MEXFailure"));
+        jbiMex.setStatus(ExchangeStatus.ERROR);
+        // TODO: get failure codes out of the message.
     }
 
-    private void outResponse(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
+    private void outResponse(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
         InOut inout = (InOut) jbiMex;
 
         try {
@@ -307,19 +327,13 @@
             mapper.toNMS(nmsg, mex.getResponse(), 
mex.getOperation().getOutput().getMessage(), null);
 
             inout.setOutMessage(nmsg);
-            _ode.getChannel().send(inout);
-
-        } catch (MessagingException ex) {
-            __log.error("Error bridging ODE out response: ", ex);
-            sendError(jbiMex, ex);
         } catch (MessageTranslationException e) {
             __log.error("Error translating ODE message " + mex.getResponse() + 
" to NMS format!", e);
-            sendError(jbiMex, e);
+            setError(jbiMex, e);
         }
     }
 
-    private void outResponseFault(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
-
+    private void outResponseFault(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
         InOut inout = (InOut) jbiMex;
 
         try {
@@ -336,29 +350,20 @@
             QName fault = mex.getFault();
             javax.wsdl.Fault wsdlFault = 
mex.getOperation().getFault(fault.getLocalPart());
             if (wsdlFault == null) {
-               sendError(jbiMex, new MessageTranslationException("Unmapped 
Fault : " + fault + ": " + mex.getFaultExplanation()));
+               setError(jbiMex, new MessageTranslationException("Unmapped 
Fault : " + fault + ": " + mex.getFaultExplanation()));
             } else {
                 mapper.toNMS(flt, mex.getFaultResponse(), 
wsdlFault.getMessage(), fault);
                 inout.setFault(flt);
-                _ode.getChannel().send(inout);
             }
-        } catch (MessagingException e) {
-            __log.error("Error bridging ODE fault response: ", e);
-            sendError(jbiMex, e);
         } catch (MessageTranslationException mte) {
             __log.error("Error translating ODE fault message " + 
mex.getFaultResponse() + " to NMS format!", mte);
-            sendError(jbiMex, mte);
+            setError(jbiMex, mte);
         }
     }
     
-    private void sendError(javax.jbi.messaging.MessageExchange jbiMex, 
Exception error) {
-        try {
-            jbiMex.setError(error);
-            jbiMex.setStatus(ExchangeStatus.ERROR);
-            _ode.getChannel().send(jbiMex);
-        } catch (Exception e) {
-            __log.error("Error sending ERROR status: ", e);
-        }
+    private void setError(javax.jbi.messaging.MessageExchange jbiMex, 
Exception error) throws MessagingException {
+        jbiMex.setError(error);
+        jbiMex.setStatus(ExchangeStatus.ERROR);
     }
 
     public Endpoint getEndpoint() {
@@ -381,9 +386,12 @@
             return found;
         }
 
+        synchronized javax.jbi.messaging.MessageExchange peek(String clientId) 
{
+            return _outstandingJbiExchanges.get(clientId);
+        }
+
         synchronized javax.jbi.messaging.MessageExchange consume(String 
clientId) {
             return _outstandingJbiExchanges.remove(clientId);
         }
-
     }
 }


Reply via email to