Author: mszefler
Date: Fri Aug 3 13:07:33 2007
New Revision: 562569
URL: http://svn.apache.org/viewvc?view=rev&rev=562569
Log:
removed the distinction between ASYNC/BLOCKING invocation style.
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
- copied, changed from r562188,
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
- copied, changed from r561873,
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Removed:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Fri Aug 3 13:07:33 2007
@@ -33,7 +33,6 @@
import java.util.concurrent.Future;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -55,7 +54,6 @@
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
@@ -63,9 +61,7 @@
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
-import org.apache.ode.bpel.o.OElementVarType;
import org.apache.ode.bpel.o.OExpressionLanguage;
-import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
@@ -155,10 +151,12 @@
// TODO : do this on a per-partnerlink basis, support transacted
styles.
HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
- istyles.add(InvocationStyle.BLOCKING);
+ istyles.add(InvocationStyle.UNRELIABLE);
+
if (!conf.isTransient()) {
- istyles.add(InvocationStyle.ASYNC);
istyles.add(InvocationStyle.RELIABLE);
+ } else {
+ istyles.add(InvocationStyle.TRANSACTED);
}
_invocationStyles = Collections.unmodifiableSet(istyles);
@@ -704,14 +702,11 @@
case RELIABLE:
mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
- case ASYNC:
- mex = new AsyncMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
- break;
case TRANSACTED:
mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
- case BLOCKING:
- mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
+ case UNRELIABLE:
+ mex = new UnreliableMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
default:
throw new AssertionError("Unexpected invocation style: " + istyle);
@@ -768,15 +763,10 @@
OPartnerLink plink = (OPartnerLink)
_oprocess.getChild(mexdao.getPartnerLinkModelId());
Operation op =
plink.getPartnerRoleOperation(mexdao.getOperation());
switch (istyle) {
- case BLOCKING:
- mex = new BlockingPartnerRoleMessageExchangeImpl(this,
mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
+ case UNRELIABLE:
+ mex = new UnreliablePartnerRoleMessageExchangeImpl(this,
mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null,
getPartnerRoleChannel(plink));
break;
- case ASYNC:
- mex = new AsyncPartnerRoleMessageExchangeImpl(this,
mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null,
getPartnerRoleChannel(plink));
- break;
-
case TRANSACTED:
mex = new TransactedPartnerRoleMessageExchangeImpl(this,
mexdao.getMessageExchangeId(), plink, op, null, /*
* EPR
@@ -1111,6 +1101,8 @@
}
void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news)
{
+ // TODO: force a myrole mex to be created if it is not in cache.
+
if (old != news)
for (WeakReference<MyRoleMessageExchangeImpl> wr :
_mexStateListeners) {
MyRoleMessageExchangeImpl mymex = wr.get();
@@ -1118,6 +1110,5 @@
mymex.onStateChanged(mexdao, old, news);
}
- // TODO: need to call
MessageExchangeContext#onMyRoleMessageExchangeStateChanged
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Fri Aug 3 13:07:33 2007
@@ -27,6 +27,9 @@
import java.util.List;
import java.util.Set;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
import javax.xml.namespace.QName;
@@ -89,6 +92,7 @@
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.ObjectPrinter;
+import org.omg.CORBA._PolicyStub;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -125,7 +129,7 @@
private List<MyRoleMessageExchangeImpl> _pendingMyRoleReplies = new
LinkedList<MyRoleMessageExchangeImpl>();
private BpelInstanceWorker _instanceWorker;
-
+
private BpelProcess _bpelProcess;
/** Five second maximum for continous execution. */
@@ -135,9 +139,7 @@
private boolean _executed;
- public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
- ProcessInstanceDAO dao,
- PROCESS PROCESS,
+ public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
ProcessInstanceDAO dao, PROCESS PROCESS,
MessageExchangeDAO instantiatingMessageExchange) {
_instanceWorker = instanceWorker;
_bpelProcess = instanceWorker._process;
@@ -174,6 +176,10 @@
}
}
+ public String toString() {
+ return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" +
_iid + "}";
+ }
+
public Long getPid() {
return _iid;
}
@@ -537,37 +543,37 @@
Status previousStatus = Status.valueOf(myrolemex.getStatus());
myrolemex.setStatus(status.toString());
- _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, status);
- if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
- MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+ doMyRoleResponse(myrolemex, previousStatus, status);
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Replying to a p2p mex, myrole " + myrolemex + " -
partnerole " + pmex);
- }
-
- // In the p2p case we copy the status/response from one mex object
into the other.
- pmex.setResponse(myrolemex.getResponse());
- pmex.setStatus(myrolemex.getStatus());
- continuePartnerReplied(pmex);
- myrolemex.release();
+ sendEvent(evt);
+ }
+ private void doMyRoleResponse(MessageExchangeDAO myrolemex, Status
previousStatus, Status newStatus) {
+ myrolemex.setStatus(newStatus.toString());
+ if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
+ p2pResponse(myrolemex);
} else /* IL-mediated communication */{
- InvocationStyle istyle =
InvocationStyle.valueOf(myrolemex.getInvocationStyle());
- switch (istyle) {
- case RELIABLE:
- scheduleReliableResponse(myrolemex);
- break;
- case ASYNC:
- scheduleAsyncResponse(myrolemex);
- break;
- default:
- break;
- }
+ _bpelProcess.fireMexStateEvent(myrolemex, previousStatus,
newStatus);
+ }
+ }
+ /**
+ * Handle P2P responses.
+ * @param myrolemex
+ */
+ private void p2pResponse(MessageExchangeDAO myrolemex) {
+ MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+
+ if (BpelProcess.__log.isDebugEnabled()) {
+ __log.debug("Replying to a p2p mex, myrole " + myrolemex + " -
partnerole " + pmex);
}
- sendEvent(evt);
+ // In the p2p case we copy the status/response from one mex object
into the other.
+ pmex.setResponse(myrolemex.getResponse());
+ pmex.setStatus(myrolemex.getStatus());
+ continuePartnerReplied(pmex);
+ myrolemex.release();
}
/**
@@ -648,16 +654,26 @@
}
public void registerTimer(TimerResponseChannel timerChannel, Date
timeToFire) {
+
+ if (_bpelProcess.isInMemory())
+ throw new InvalidProcessException("Process not compatible with
in-memory execution.");
+
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setChannel(timerChannel.export());
we.setType(WorkEvent.Type.TIMER);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey
key) {
+
+ if (_bpelProcess.isInMemory())
+ throw new InvalidProcessException("Process not compatible with
in-memory execution.");
+
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
@@ -696,7 +712,8 @@
evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
- MessageExchangeDAO mexDao =
_dao.getConnection().createMessageExchange(new
GUID().toString(),MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+ MessageExchangeDAO mexDao =
_dao.getConnection().createMessageExchange(new GUID().toString(),
+ MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
mexDao.setStatus(MessageExchange.Status.NEW.toString());
mexDao.setOperation(operation.getName());
mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -773,39 +790,83 @@
return;
}
- PortType portType = partnerLink.partnerLink.partnerRolePortType;
EndpointReference myRoleEpr = null; // TODO: how did we get this?
mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
Set<InvocationStyle> supportedStyles =
_contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel,
partnerEpr);
- if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
- // If RELIABLE is supported, this is easy, we just do it in-line.
- ReliablePartnerRoleMessageExchangeImpl reliableMex = new
ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink,
operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- _contexts.mexContext.invokePartnerReliable(reliableMex);
- } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
- // If TRANSACTED is supported, this is again easy, do it in-line.
- TransactedPartnerRoleMessageExchangeImpl transactedMex = new
TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink,
operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- _contexts.mexContext.invokePartnerTransacted(transactedMex);
- } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
- // For BLOCKING invocation, we defer the call until after commit
(unless idempotent).
- BlockingPartnerRoleMessageExchangeImpl blockingMex = new
BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink,
operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- schedule(new BlockingInvoker(blockingMex));
- } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
- // For ASYNC style, we defer the call until after commit (unless
idempotent).
- AsyncPartnerRoleMessageExchangeImpl asyncMex = new
AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink,
operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- schedule(new AsyncInvoker(asyncMex));
-
+
+ boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern())
== MessageExchangePattern.REQUEST_ONLY;
+
+ if (_bpelProcess.isInMemory()) {
+ // In-memory processes are a bit different, we're never going to
do any scheduling for them, so we'd
+ // prefer to have TRANSACTED invocation style.
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ // If TRANSACTED is supported, this is again easy, do it
in-line.
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new
TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE) &&
oneway) {
+ // We can do RELIABLE for in-mem, but only if they are one way.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new
ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ // Need to cheat a little bit for in-memory processes; do the
invoke in-line, but first suspend
+ // the transaction so that the IL does not get confused.
+ Transaction tx;
+ try {
+ tx = _contexts.txManager.suspend();
+ } catch (Exception ex) {
+ throw new BpelEngineException("TxManager Error: cannot
suspend!", ex);
+ }
+ try {
+ UnreliablePartnerRoleMessageExchangeImpl unreliableMex =
new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+ unreliableMex.waitForResponse();
+ } finally {
+ try {
+ _contexts.txManager.resume(tx);
+ } catch (Exception e) {
+ throw new BpelEngineException("TxManager Error: cannot
resume!", e);
+ }
+ }
+ }
} else {
- // This really should not happen, indicates IL is screwy.
- __log.error("Integration Layer did not agree to any known
invocation style for EPR " + partnerEpr);
- mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
- mexDao.setStatus(Status.FAILURE.toString());
- mexDao.setFaultExplanation("NoMatchingStyle");
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+
+ // If TRANSACTED is supported, this is again easy, do it
in-line. Also, this what we always do for
+ // in-mem processes (even if the IL claims to not support it.)
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new
TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+ // If RELIABLE is supported, this is easy, we just do it
in-line.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new
ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ // For BLOCKING invocation, we defer the call until after
commit (unless idempotent).
+ UnreliablePartnerRoleMessageExchangeImpl blockingMex = new
UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(),
partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ // We schedule in-memory (no db) to guarantee "at most once"
semantics.
+ schedule(new UnreliableInvoker(blockingMex));
+ // TODO: how do we recover the invocation if system dies in
BlockingInvoker?
+ } else {
+ // This really should not happen, indicates IL is screwy.
+ __log.error("Integration Layer did not agree to any known
invocation style for EPR " + partnerEpr);
+
mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+ mexDao.setStatus(Status.FAILURE.toString());
+ mexDao.setFaultExplanation("NoMatchingStyle");
+ }
}
}
@@ -844,7 +905,7 @@
else if
(target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
style = InvocationStyle.TRANSACTED;
else
- style = InvocationStyle.BLOCKING;
+ style = InvocationStyle.UNRELIABLE;
} else /* persisted */{
if (operation.getOutput() != null
@@ -869,7 +930,8 @@
if (__log.isDebugEnabled())
__log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + "
partnerSessionId=" + partnerSessionId);
- MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new
GUID().toString(),MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new
GUID().toString(),
+ MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
myRoleMex.setCallee(serviceName);
myRoleMex.setPipedMessageExchange(partnerRoleMex);
myRoleMex.setOperation(partnerRoleMex.getOperation());
@@ -898,9 +960,8 @@
throw new BpelEngineException("MUST RUN IN TRANSACTION!");
if (_executed)
throw new IllegalStateException("cannot call execute() twice!");
-
+
long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
-
// Execute the process state reductions
boolean canReduce = true;
@@ -941,6 +1002,7 @@
try {
WorkEvent we = new WorkEvent();
we.setIID(_iid);
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.RESUME);
_contexts.scheduler.schedulePersistedJob(we.getDetail(),
new Date());
} catch (ContextException e) {
@@ -1139,15 +1201,7 @@
mexDao.setFaultExplanation(optionalFaultData.toString());
}
mexDao.setFaultExplanation("Process completed without
responding.");
-
- switch (istyle) {
- case RELIABLE:
- scheduleReliableResponse(mexDao);
- break;
- case ASYNC:
- scheduleAsyncResponse(mexDao);
- }
-
+ doMyRoleResponse(mexDao, status, Status.FAILURE);
}
}
}
@@ -1351,7 +1405,7 @@
* Attempt to match message exchanges on a correlator.
*
*/
- public void matcherEvent(String correlatorId, CorrelationKey ckey) {
+ void matcherEvent(String correlatorId, CorrelationKey ckey) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("MatcherEvent handling: correlatorId=" + correlatorId
+ ", ckey=" + ckey);
}
@@ -1387,19 +1441,6 @@
}
}
- /**
- * Add a scheduled ASYNC response.
- *
- * @param messageExchangeId
- */
- private void scheduleAsyncResponse(MessageExchangeDAO mexdao) {
- assert !_bpelProcess.isInMemory() : "Internal error; attempt to
schedule in-memory process";
- assert _contexts.isTransacted();
-
- final MyRoleMessageExchangeImpl mex =
_bpelProcess.recreateMyRoleMex(mexdao);
- _pendingMyRoleReplies.add(mex);
- }
-
private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
assert !_bpelProcess.isInMemory() : "Internal error; attempt to
schedule in-memory process";
@@ -1422,31 +1463,52 @@
private void continuePartnerReplied(MessageExchangeDAO pmex) {
}
-
- class BlockingInvoker implements Runnable {
- public BlockingInvoker(BlockingPartnerRoleMessageExchangeImpl
blockingMex) {
- // TODO Auto-generated constructor stub
+ /**
+ * Runnable that actually performs UNRELIABLE invokes on the partner.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ class UnreliableInvoker implements Runnable {
+
+ UnreliablePartnerRoleMessageExchangeImpl _blockingMex;
+
+ public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl
blockingMex) {
+ _blockingMex = blockingMex;
}
public void run() {
- // TODO Auto-generated method stub
-
+ assert !_contexts.isTransacted();
+
+ // TODO: what happens if system fails right here? we'll need to
add a "retry" possibility
+
+ Runnable prc;
+ try {
+ _contexts.mexContext.invokePartnerBlocking(_blockingMex);
+ prc = new PartnerResponseContinuation(_blockingMex);
+ } catch (Exception ce) {
+ prc = new PartnerResponseContinuation(_blockingMex);
+ }
+
+ // Keep using the same thread to do the work, but note we need to
run this in a transaction.
+ _instanceWorker.enqueue(_bpelProcess._server.new
TransactedRunnable(prc));
}
-
+
}
-
- class AsyncInvoker implements Runnable {
- public AsyncInvoker(AsyncPartnerRoleMessageExchangeImpl asyncMex) {
- // TODO Auto-generated constructor stub
+ class PartnerResponseContinuation implements Runnable {
+
+ private UnreliablePartnerRoleMessageExchangeImpl _mex;
+
+ public
PartnerResponseContinuation(UnreliablePartnerRoleMessageExchangeImpl
blockingMex) {
+ _mex = blockingMex;
}
public void run() {
- // TODO Auto-generated method stub
-
+
}
-
+
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
Fri Aug 3 13:07:33 2007
@@ -171,6 +171,7 @@
WorkEvent we = new WorkEvent();
we.setIID(iid);
+ we.setProcessId(_process.getPID());
we.setType(WorkEvent.Type.RESUME);
_process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
@@ -294,6 +295,7 @@
WorkEvent we = new WorkEvent();
we.setType(WorkEvent.Type.RESUME);
+ we.setProcessId(_process.getPID());
we.setIID(iid);
_process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Fri Aug 3 13:07:33 2007
@@ -218,7 +218,6 @@
}
void serverFailed(FailureType type, String reason, Element details) {
- // TODO not using FailureType, nor details
_failureType = type;
_explanation = reason;
setStatus(Status.FAILURE);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Fri Aug 3 13:07:33 2007
@@ -175,6 +175,7 @@
protected WorkEvent generateInvokeResponseWorkEvent() {
WorkEvent we = new WorkEvent();
+ we.setProcessId(_process.getPID());
we.setIID(_iid);
we.setType(WorkEvent.Type.PARTNER_RESPONSE);
we.setChannel(_responseChannel);
@@ -189,9 +190,6 @@
if (getStatus() != MessageExchange.Status.REQUEST && getStatus() !=
MessageExchange.Status.ASYNC)
throw new BpelEngineException("Invalid message exchange state,
expect REQUEST or ASYNC, but got " + getStatus());
- // In-memory processe are special, they don't allow scheduling so any
replies must be delivered immediately.
- if (!_blocked && _process.isInMemory())
- throw new BpelEngineException("Cannot reply to in-memory process
outside of BLOCKING call");
}
}
Copied:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
(from r562188,
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java)
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java&r1=562188&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
Fri Aug 3 13:07:33 2007
@@ -11,8 +11,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -22,12 +22,13 @@
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
-public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+public class UnreliableMyRoleMessageExchangeImpl extends
MyRoleMessageExchangeImpl {
private static final Log __log =
LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
+ boolean _done = false;
ResponseFuture _future;
- public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId,
OPartnerLink oplink, Operation operation, QName callee) {
+ public UnreliableMyRoleMessageExchangeImpl(BpelProcess process, String
mexId, OPartnerLink oplink, Operation operation, QName callee) {
super(process, mexId, oplink, operation, callee);
}
@@ -59,7 +60,7 @@
_process.enqueueTransaction(new Callable<Void>() {
public Void call() throws Exception {
- AsyncMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
+
UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
MessageExchangeDAO dao =
_process.createMessageExchange(getMessageExchangeId(),
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
if (_process.isInMemory())
@@ -76,6 +77,32 @@
}
+
+ @Override
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+
+ @Override
+ public Status invokeBlocking() throws BpelEngineException,
TimeoutException {
+ if (_done)
+ return getStatus();
+
+ Future<Status> future = _future != null ? _future :
super.invokeAsync();
+
+ try {
+ future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+ _done = true;
+ return getStatus();
+ } catch (InterruptedException e) {
+ throw new BpelEngineException(e);
+ } catch (ExecutionException e) {
+ throw new BpelEngineException(e.getCause());
+ }
+ }
+
+
private static class ResponseFuture implements Future<Status> {
private Status _status;
@@ -121,11 +148,6 @@
this.notifyAll();
}
}
- }
-
- @Override
- public InvocationStyle getInvocationStyle() {
- return InvocationStyle.ASYNC;
}
}
Copied:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
(from r561873,
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java)
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java&r1=561873&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java&r2=562569
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
Fri Aug 3 13:07:33 2007
@@ -2,46 +2,109 @@
import javax.wsdl.Operation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.o.OPartnerLink;
/**
* Implementation of the [EMAIL PROTECTED] PartnerRoleMessageExchange}
interface that is passed to the IL when the
- * BLOCKING invocation style is used (see [EMAIL PROTECTED]
InvocationStyle#BLOCKING}). The basic idea here is that
- * with this style, the IL performs the operation while blocking in the
- * [EMAIL PROTECTED]
MessageExchangeContext#invokePartner(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}
method.
+ * UNRELIABLE invocation style is used (see [EMAIL PROTECTED]
InvocationStyle#UNRELIABLE}). The basic idea here is
+ * that with this style, the IL performs the operation outside of a
transactional context. It can either
+ * finish it right away (BLOCK) or indicate that the response will be provided
later (replyASYNC).
*
- * This InvocationStyle makes this class rather trivial.
+ *
+ * TODO: serious synchronization issues in this class.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
-public class BlockingPartnerRoleMessageExchangeImpl extends
PartnerRoleMessageExchangeImpl {
+public class UnreliablePartnerRoleMessageExchangeImpl extends
PartnerRoleMessageExchangeImpl {
+ private static final Log __log =
LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class);
+
- BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId,
OPartnerLink oplink, Operation operation, EndpointReference epr,
EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String
mexId, OPartnerLink oplink, Operation operation, EndpointReference epr,
EndpointReference myRoleEPR, PartnerRoleChannel channel) {
super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
}
- /**
- * The criteria for issuing a replyXXX call on BLOCKING message exchanges
is that the replyXXX must come while the
- * engine is blocked in an
- * [EMAIL PROTECTED]
MessageExchangeContext#invokePartnerBlocking(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}.
- * method.
- */
+
+ @Override
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+
+ @Override
+ protected void resumeInstance() {
+ assert !_contexts.isTransacted() : "checkReplyContext() should have
prevented us from getting here.";
+ assert !_process.isInMemory() : "resumeInstance() for in-mem processes
makes no sense.";
+
+ final WorkEvent we = generateInvokeResponseWorkEvent();
+ if (__log.isDebugEnabled()) {
+ __log.debug("resumeInstance: scheduling WorkEvent " + we);
+ }
+
+
+ doInTX(new InDbAction<Void>() {
+
+ public Void call(MessageExchangeDAO mexdao) {
+ save(mexdao);
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ return null;
+ }
+ });
+ }
+
@Override
protected void checkReplyContextOk() {
- if (!_blocked)
+ super.checkReplyContextOk();
+
+ if (!_blocked && getStatus() != Status.ASYNC)
throw new BpelEngineException("replyXXX operation attempted
outside of BLOCKING region!");
+
+ // Prevent user from attempting the replyXXXX calls while a
transaction is active.
+ if (_contexts.isTransacted())
+ throw new BpelEngineException("Cannot reply to UNRELIABLE style
invocation from a transactional context!");
+
+
}
+
+
@Override
- public InvocationStyle getInvocationStyle() {
- return InvocationStyle.BLOCKING;
+ public void replyAsync(String foreignKey) {
+ if (__log.isDebugEnabled())
+ __log.debug("replyAsync mex=" + _mexId);
+
+ sync();
+
+ if (!_blocked)
+ throw new BpelEngineException("Invalid context for replyAsync();
can only be called during MessageExchangeContext call. ");
+
+ // TODO: shouldn't this set _blocked?
+
+ checkReplyContextOk();
+ setStatus(Status.ASYNC);
+ _foreignKey = foreignKey;
+ sync();
+
+ }
+
+
+ /**
+ * Method used by server to wait until a response is available.
+ */
+ Status waitForResponse() {
+ // TODO: actually wait for response.
+ return getStatus();
}