Author: rr
Date: Tue Jul 13 13:27:53 2010
New Revision: 963709
URL: http://svn.apache.org/viewvc?rev=963709&view=rev
Log:
ODE-556: Rejecting in-out operations immediately when there's no route found -
fixed P2P case
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Tue Jul 13 13:27:53 2010
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import javax.wsdl.Fault;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -40,8 +41,11 @@ import org.apache.ode.bpel.common.FaultE
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -50,10 +54,13 @@ import org.apache.ode.bpel.explang.Evalu
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
@@ -66,6 +73,7 @@ import org.apache.ode.bpel.o.OMessageVar
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
+import org.apache.ode.bpel.runtime.BpelRuntimeContext;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.PROCESS;
@@ -1157,4 +1165,59 @@ public class BpelProcess {
public int getVersion() {
return
Integer.parseInt(_pid.getLocalPart().substring(_pid.getLocalPart().lastIndexOf('-')
+ 1));
}
+
+ public void doAsyncReply(MyRoleMessageExchangeImpl m, BpelRuntimeContext
context) {
+ MessageExchangeDAO mex = m.getDAO();
+ PartnerRoleMessageExchange pmex = null;
+
+ if (mex.getPipedMessageExchangeId() != null) {
+ pmex = (PartnerRoleMessageExchange)
getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
+ }
+
+ if (pmex != null) {
+ if (BpelProcess.__log.isDebugEnabled()) {
+ __log.debug("Replying to a p2p mex, myrole " + m + " -
partnerole " + pmex);
+ }
+
+ if (pmex.getStatus() == Status.ASYNC || pmex.getStatus() ==
Status.REQUEST) {
+ try {
+ switch (m.getStatus()) {
+ case FAILURE:
+ // We can't seem to get the failure out of the
myrole mex?
+
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed",
null);
+ break;
+ case FAULT:
+ Fault fault =
pmex.getOperation().getFault(m.getFault().getLocalPart());
+ if (fault == null) {
+ __log.error("process " + this + " instance " +
(context != null ? context.getPid() : null) + " thrown unmapped fault in p2p
communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted
to failure");
+
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown
unmapped fault in p2p communication " + m.getFault() + " " +
m.getFaultExplanation() + " - converted to failure",
m.getFaultResponse().getMessage());
+ } else {
+ Message faultRes =
pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
+ .getMessage().getQName());
+
faultRes.setMessage(m.getResponse().getMessage());
+ pmex.replyWithFault(m.getFault(), faultRes);
+ }
+ break;
+ case RESPONSE:
+ Message response =
pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+ response.setMessage(m.getResponse().getMessage());
+ pmex.reply(response);
+ break;
+ default:
+ __log.warn("Unexpected state: " + m.getStatus());
+ break;
+ }
+ } finally {
+ mex.release(this.isCleanupCategoryEnabled(m.getStatus() ==
MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+ }
+ } else {
+ __log.warn("Can't send response to a p2p mex: " + mex + "
partner mex: " + pmex);
+ }
+ } else {
+ if (context != null) context.checkInvokeExternalPermission();
+ this._engine._contexts.mexContext.onAsyncReply(m);
+ //mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus()
== MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+ }
+ }
+
}
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Tue Jul 13 13:27:53 2010
@@ -529,61 +529,6 @@ public class BpelRuntimeContextImpl impl
}
}
- protected void doAsyncReply(MyRoleMessageExchangeImpl m) {
- MessageExchangeDAO mex = m.getDAO();
- PartnerRoleMessageExchange pmex = null;
-
- if (mex.getPipedMessageExchangeId() != null) {
- pmex = (PartnerRoleMessageExchange) _bpelProcess
-
.getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
- }
-
- if (pmex != null) {
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Replying to a p2p mex, myrole " + m + " -
partnerole " + pmex);
- }
-
- if (pmex.getStatus() == Status.ASYNC || pmex.getStatus() ==
Status.REQUEST) {
- try {
- switch (m.getStatus()) {
- case FAILURE:
- // We can't seem to get the failure out of the
myrole mex?
-
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed",
null);
- break;
- case FAULT:
- Fault fault =
pmex.getOperation().getFault(m.getFault().getLocalPart());
- if (fault == null) {
- __log.error("process " + _bpelProcess + "
instance " + _iid + " thrown unmapped fault in p2p communication " +
m.getFault() + " " + m.getFaultExplanation() + " - converted to failure");
-
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown
unmapped fault in p2p communication " + m.getFault() + " " +
m.getFaultExplanation() + " - converted to failure",
m.getFaultResponse().getMessage());
- } else {
- Message faultRes =
pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
- .getMessage().getQName());
-
faultRes.setMessage(m.getResponse().getMessage());
- pmex.replyWithFault(m.getFault(), faultRes);
- }
- break;
- case RESPONSE:
- Message response =
pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
- response.setMessage(m.getResponse().getMessage());
- pmex.reply(response);
- break;
- default:
- __log.warn("Unexpected state: " + m.getStatus());
- break;
- }
- } finally {
-
mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() ==
MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
- }
- } else {
- __log.warn("Can't send response to a p2p mex: " + mex + "
partner mex: " + pmex);
- }
- } else {
- checkInvokeExternalPermission();
- _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
- //mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus()
== MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
- }
- }
-
public void reply(final PartnerLinkInstance plinkInstnace, final String
opName, final String mexId, Element msg,
QName fault) throws FaultException {
String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
@@ -620,7 +565,7 @@ public class BpelRuntimeContextImpl impl
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
}
- doAsyncReply(m);
+ _bpelProcess.doAsyncReply(m, this);
// send event
sendEvent(evt);
@@ -1211,7 +1156,7 @@ public class BpelRuntimeContextImpl impl
}
default:
mex.setFailure(FailureType.OTHER, "No response.",
null);
- doAsyncReply(mex);
+ _bpelProcess.doAsyncReply(mex, this);
}
}
}
@@ -1232,7 +1177,7 @@ public class BpelRuntimeContextImpl impl
mex.setFault(faultData.getFaultName(), message);
mex.setFaultExplanation(faultData.getExplanation());
- doAsyncReply(mex);
+ _bpelProcess.doAsyncReply(mex, this);
}
}
}
@@ -1245,7 +1190,7 @@ public class BpelRuntimeContextImpl impl
MyRoleMessageExchangeImpl mex = new
MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
_bpelProcess.initMyRoleMex(mex);
mex.setFailure(FailureType.OTHER, "No response.", null);
- doAsyncReply(mex);
+ _bpelProcess.doAsyncReply(mex, this);
}
}
}
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Tue Jul 13 13:27:53 2010
@@ -182,6 +182,7 @@ abstract class MessageExchangeImpl imple
// TODO not using FailureType, nor details
setStatus(Status.FAILURE);
getDAO().setFaultExplanation(reason);
+ responseReceived();
}
void setStatus(Status status) {
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
Tue Jul 13 13:27:53 2010
@@ -258,7 +258,7 @@ public class PartnerLinkMyRoleImpl exten
if (!mex.isAsynchronous()) {
mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process
instance matching correlation keys.", null);
if (!OdeGlobalConfig.queueInOutMessages()) {
- _process._engine._contexts.mexContext.onAsyncReply(mex);
+ _process.doAsyncReply(mex, null);
}
} else {
// enqueue message with the last message route, as per the
comments in caller (@see BpelProcess.invokeProcess())
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java
Tue Jul 13 13:27:53 2010
@@ -309,4 +309,6 @@ public interface BpelRuntimeContext {
Date getCurrentEventDateTime();
ClassLoader getProcessClassLoader();
+
+ void checkInvokeExternalPermission();
}