Author: rr
Date: Wed Aug 19 23:45:13 2009
New Revision: 806009
URL: http://svn.apache.org/viewvc?rev=806009&view=rev
Log:
ODE-654: Reply for MyRole MEX shoudn't deliver responses from rolled back jobs
fix
Modified:
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
Modified:
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?rev=806009&r1=806008&r2=806009&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
(original)
+++
ode/branches/APACHE_ODE_1.X/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
Wed Aug 19 23:45:13 2009
@@ -24,6 +24,7 @@
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.msgmap.MessageTranslationException;
@@ -181,13 +182,16 @@
* message exchange
*/
public void onResponse(MyRoleMessageExchange mex) {
- __log.debug("Consuming MEX tracker " + mex.getClientId());
- javax.jbi.messaging.MessageExchange jbiMex =
_jbiMexTracker.consume(mex.getClientId());
+ final String clientId = mex.getClientId();
+ final String mexId = mex.getMessageExchangeId();
+ __log.debug("Processing MEX tracker mexId: " + mexId + " clientId: " +
clientId);
+ final javax.jbi.messaging.MessageExchange jbiMex =
_jbiMexTracker.peek(clientId);
if (jbiMex == null) {
- __log.warn("Ignoring unknown async reply: " + mex);
+ __log.warn("Ignoring unknown async reply. mexId: " + mexId + "
clientId: " + clientId);
return;
}
+ try {
switch (mex.getStatus()) {
case FAULT:
outResponseFault(mex, jbiMex);
@@ -199,9 +203,30 @@
outFailure(mex, jbiMex);
break;
default:
- __log.warn("Received ODE message exchange in unexpected state: " +
mex.getStatus());
+ __log.warn("Received ODE message exchange in unexpected state: " +
mex.getStatus() + " mexId: " + mexId + " clientId: " + clientId);
}
+
mex.release(mex.getStatus() == MessageExchange.Status.RESPONSE);
+ _ode._scheduler.registerSynchronizer(new Synchronizer() {
+ public void afterCompletion(boolean success) {
+ if (success) {
+ //Deliver reply to external world only if ODE scheduler's
job has completed successfully
+ try {
+ _ode.getChannel().send(jbiMex);
+ __log.debug("Consuming MEX tracker mexId: " + mexId + "
clientId: " + clientId);
+ _jbiMexTracker.consume(clientId);
+ } catch (MessagingException e) {
+ __log.error("Error delivering response from ODE to JBI
mexId: " + mexId + " clientId: " + clientId, e);
+ }
+ }
+ }
+
+ public void beforeCompletion() {
+ }
+ });
+ } catch (MessagingException e) {
+ __log.error("Error processing response from ODE to JBI mexId: " +
mexId + " clientId: " + clientId, e);
+ }
}
/**
@@ -255,7 +280,7 @@
}
} else {
__log.error("ODE MEX " + odeMex + " was unroutable.");
- sendError(jbiMex, new IllegalArgumentException("Unroutable
invocation."));
+ setError(jbiMex, new IllegalArgumentException("Unroutable
invocation."));
}
success = true;
@@ -279,18 +304,13 @@
}
- private void outFailure(MyRoleMessageExchange odeMex,
javax.jbi.messaging.MessageExchange jbiMex) {
- try {
- jbiMex.setError(new Exception("MEXFailure"));
- jbiMex.setStatus(ExchangeStatus.ERROR);
- // TODO: get failure codes out of the message.
- _ode.getChannel().send(jbiMex);
- } catch (MessagingException ex) {
- __log.fatal("Error bridging ODE out response: ", ex);
- }
+ private void outFailure(MyRoleMessageExchange odeMex,
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
+ jbiMex.setError(new Exception("MEXFailure"));
+ jbiMex.setStatus(ExchangeStatus.ERROR);
+ // TODO: get failure codes out of the message.
}
- private void outResponse(MyRoleMessageExchange mex,
javax.jbi.messaging.MessageExchange jbiMex) {
+ private void outResponse(MyRoleMessageExchange mex,
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
InOut inout = (InOut) jbiMex;
try {
@@ -307,19 +327,13 @@
mapper.toNMS(nmsg, mex.getResponse(),
mex.getOperation().getOutput().getMessage(), null);
inout.setOutMessage(nmsg);
- _ode.getChannel().send(inout);
-
- } catch (MessagingException ex) {
- __log.error("Error bridging ODE out response: ", ex);
- sendError(jbiMex, ex);
} catch (MessageTranslationException e) {
__log.error("Error translating ODE message " + mex.getResponse() +
" to NMS format!", e);
- sendError(jbiMex, e);
+ setError(jbiMex, e);
}
}
- private void outResponseFault(MyRoleMessageExchange mex,
javax.jbi.messaging.MessageExchange jbiMex) {
-
+ private void outResponseFault(MyRoleMessageExchange mex,
javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
InOut inout = (InOut) jbiMex;
try {
@@ -336,29 +350,20 @@
QName fault = mex.getFault();
javax.wsdl.Fault wsdlFault =
mex.getOperation().getFault(fault.getLocalPart());
if (wsdlFault == null) {
- sendError(jbiMex, new MessageTranslationException("Unmapped
Fault : " + fault + ": " + mex.getFaultExplanation()));
+ setError(jbiMex, new MessageTranslationException("Unmapped
Fault : " + fault + ": " + mex.getFaultExplanation()));
} else {
mapper.toNMS(flt, mex.getFaultResponse(),
wsdlFault.getMessage(), fault);
inout.setFault(flt);
- _ode.getChannel().send(inout);
}
- } catch (MessagingException e) {
- __log.error("Error bridging ODE fault response: ", e);
- sendError(jbiMex, e);
} catch (MessageTranslationException mte) {
__log.error("Error translating ODE fault message " +
mex.getFaultResponse() + " to NMS format!", mte);
- sendError(jbiMex, mte);
+ setError(jbiMex, mte);
}
}
- private void sendError(javax.jbi.messaging.MessageExchange jbiMex,
Exception error) {
- try {
- jbiMex.setError(error);
- jbiMex.setStatus(ExchangeStatus.ERROR);
- _ode.getChannel().send(jbiMex);
- } catch (Exception e) {
- __log.error("Error sending ERROR status: ", e);
- }
+ private void setError(javax.jbi.messaging.MessageExchange jbiMex,
Exception error) throws MessagingException {
+ jbiMex.setError(error);
+ jbiMex.setStatus(ExchangeStatus.ERROR);
}
public Endpoint getEndpoint() {
@@ -381,9 +386,12 @@
return found;
}
+ synchronized javax.jbi.messaging.MessageExchange peek(String clientId)
{
+ return _outstandingJbiExchanges.get(clientId);
+ }
+
synchronized javax.jbi.messaging.MessageExchange consume(String
clientId) {
return _outstandingJbiExchanges.remove(clientId);
}
-
}
}