Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -19,6 +19,11 @@ package org.apache.ode.bpel.engine; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import javax.wsdl.Operation; import javax.xml.namespace.QName; @@ -35,6 +40,8 @@ import org.apache.ode.bpel.o.OPartnerLink; import org.w3c.dom.Element; +import com.sun.corba.se.spi.activation._ActivatorImplBase; + /** * Base-class implementation of the interface used to expose a partner invocation to the integration layer. * @@ -51,22 +58,33 @@ protected volatile String _foreignKey; + protected Lock _accessLock = new ReentrantLock(); + + protected Condition _stateChanged = _accessLock.newCondition(); + protected Condition _acked = _accessLock.newCondition(); + private QName _caller; - /** thread-local indicator telling us if a given thread is the thread that "owns" the object. */ - final ThreadLocal<Boolean> _ownerThread = new ThreadLocal<Boolean>() { - @Override - protected Boolean initialValue() { - return false; - } + /** the states for a partner mex. */ + enum State { + /** state when we're in one of the MexContext.invokeXXX methods. */ + INVOKE_XXX, + /** hold all actions (blocks the IL) */ + HOLD, + + /** the MEX is ASYNC ("in the wild"), i.e. a response can come at any momemnt from any thread. */ + ASYNC, + + /** the MEX is dead, it should no longer be accessed by the IL */ + DEAD }; - volatile boolean _blocked = false; + protected State _state = State.INVOKE_XXX; - PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, + PartnerRoleMessageExchangeImpl(BpelProcess process, Long iid, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { - super(process, mexId, oplink, oplink.partnerRolePortType, operation); + super(process, iid, mexId, oplink, oplink.partnerRolePortType, operation); _myRoleEPR = myRoleEPR; _partnerRoleChannel = channel; } @@ -78,49 +96,75 @@ } @Override - public void save(MessageExchangeDAO dao) { + void save(MessageExchangeDAO dao) { super.save(dao); } + @Override + void ack(AckType acktype) { + _accessLock.lock(); + try { + super.ack(acktype); + _acked.signalAll(); + } finally { + _accessLock.unlock(); + } + } + public void replyAsync(String foreignKey) { - throw new BpelEngineException("replyAsync() is not supported for invocation style " + getInvocationStyle()); + throw new IllegalStateException("replyAsync() is not supported for invocation style " + getInvocationStyle()); } public void replyOneWayOk() { if (__log.isDebugEnabled()) { __log.debug("replyOneWayOk mex=" + getMessageExchangeId()); } - sync(); - checkReplyContextOk(); - setStatus(Status.ASYNC); - sync(); + + _accessLock.lock(); + try { + checkReplyContextOk(); + ack(AckType.ONEWAY); + } finally { + _accessLock.unlock(); + } } public void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException { if (__log.isDebugEnabled()) { __log.debug("replyWithFault mex=" + getMessageExchangeId()); } - sync(); - checkReplyContextOk(); - _fault = faultType; - _response = (MessageImpl) outputFaultMessage; - setStatus(Status.FAULT); - sync(); - if (!_blocked) - resumeInstance(); + + _accessLock.lock(); + try { + checkReplyContextOk(); + _fault = faultType; + _failureType = null; + _response = (MessageImpl) outputFaultMessage; + ack(AckType.FAULT); + if (_state == State.ASYNC) + asyncACK(); + } finally { + _accessLock.unlock(); + } } public void reply(Message response) throws BpelEngineException { if (__log.isDebugEnabled()) { __log.debug("reply mex=" + getMessageExchangeId()); } - sync(); - checkReplyContextOk(); - _response = (MessageImpl) response; - setStatus(Status.RESPONSE); - sync(); - if (!_blocked) - resumeInstance(); + + _accessLock.lock(); + try { + checkReplyContextOk(); + _response = (MessageImpl) response; + _fault = null; + _failureType = null; + ack(AckType.RESPONSE); + if (_state == State.ASYNC) + asyncACK(); + } finally { + _accessLock.unlock(); + } } @@ -128,14 +172,20 @@ if (__log.isDebugEnabled()) { __log.debug("replyWithFailure mex=" + getMessageExchangeId()); } - sync(); - checkReplyContextOk(); - _failureType = type; - _explanation = description; - setStatus(Status.FAILURE); - sync(); - if (!_blocked) - resumeInstance(); + + _accessLock.lock(); + try { + checkReplyContextOk(); + _failureType = type; + _explanation = description; + _fault = null; + _response = null; + ack(AckType.FAILURE); + if (_state == State.ASYNC) + asyncACK(); + } finally { + _accessLock.unlock(); + } } public QName getCaller() { @@ -168,28 +218,47 @@ * for ASYNC and RELIABLE invocations. * */ - protected void resumeInstance() { - assert false : "should not get resumeInstance() call"; - throw new IllegalStateException("InternalError: unexpected state"); - } + protected abstract void asyncACK(); + + + protected void checkReplyContextOk() { + // Prevent duplicate replies. + while (_state == State.HOLD) + try { + _stateChanged.await(); + } catch (InterruptedException e) { + throw new BpelEngineException("Thread Interrupted.", e); + } - protected WorkEvent generateInvokeResponseWorkEvent() { - WorkEvent we = new WorkEvent(); - we.setProcessId(_process.getPID()); - we.setIID(_iid); - we.setType(WorkEvent.Type.PARTNER_RESPONSE); - we.setChannel(_responseChannel); - we.setMexId(_mexId); + if (_state == State.DEAD) + throw new IllegalStateException("Object used in inappropriate context. "); - return we; + if (getStatus() != MessageExchange.Status.REQ) + throw new IllegalStateException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus()); } - protected void checkReplyContextOk() { - // Prevent duplicate replies. - if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC) - throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus()); - + void setState(State newstate) { + _accessLock.lock(); + try { + _state = newstate; + _stateChanged.signalAll(); + } finally { + _accessLock.unlock(); + } + } + + public boolean waitForAck(long timeout) throws InterruptedException { + _accessLock.lock(); + try { + if (getStatus() != Status.ACK) + return _acked.await(timeout,TimeUnit.MILLISECONDS); + else + return true; + } finally { + _accessLock.unlock(); + } } + }
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -19,33 +19,17 @@ package org.apache.ode.bpel.engine; +import javax.wsdl.Operation; +import javax.xml.namespace.QName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.dao.MessageDAO; -import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.InvocationStyle; -import org.apache.ode.bpel.iapi.Message; -import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; -import org.apache.ode.bpel.iapi.Scheduler; -import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus; -import org.apache.ode.bpel.intercept.AbortMessageExchangeException; -import org.apache.ode.bpel.intercept.FaultMessageExchangeException; import org.apache.ode.bpel.intercept.InterceptorInvoker; -import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; -import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext; import org.apache.ode.bpel.o.OPartnerLink; -import javax.wsdl.Operation; -import javax.xml.namespace.QName; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - /** * Provides an implementation of the [EMAIL PROTECTED] MyRoleMessageExchange} inteface for interactions performed in the * [EMAIL PROTECTED] InvocationStyle#RELIABLE} style. @@ -68,7 +52,7 @@ assertTransaction(); // Cover the case where invoke was already called. - if (getStatus() == Status.REQUEST) + if (getStatus() == Status.REQ) return; if (getStatus() != Status.NEW) @@ -80,7 +64,9 @@ if (__log.isDebugEnabled()) __log.debug("invoke() EPR= " + _epr + " ==> " + _process); - setStatus(Status.REQUEST); + + request(); + save(getDAO()); scheduleInvoke(); } Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -3,56 +3,69 @@ import javax.wsdl.Operation; import org.apache.ode.bpel.dao.MessageExchangeDAO; -import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction; import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.EndpointReference; import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.PartnerRoleChannel; -import org.apache.ode.bpel.iapi.MessageExchange.Status; import org.apache.ode.bpel.o.OPartnerLink; public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { - public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) { - super(process, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel); + public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation op, + EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) { + super(process, iid, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel); } - @Override protected void checkReplyContextOk() { super.checkReplyContextOk(); - + if (!_contexts.isTransacted()) throw new BpelEngineException("Cannot replyXXX from non-transaction context!"); } - @Override public void replyAsync(String foreignKey) { - if (!_blocked) - throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. "); - checkReplyContextOk(); - setStatus(Status.ASYNC); - _foreignKey = foreignKey; + _accessLock.lock(); + try { + checkReplyContextOk(); + + if (_state != State.INVOKE_XXX) + throw new IllegalStateException( + "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. "); + + _foreignKey = foreignKey; + } finally { + _accessLock.unlock(); + } } - @Override - protected void resumeInstance() { + protected void asyncACK() { // TODO Auto-generated method stub assert _contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here."; assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes no sense."; - final WorkEvent we = generateInvokeResponseWorkEvent(); - - save(getDAO()); + MessageExchangeDAO mexdao = getDAO(); + final WorkEvent we = generatePartnerResponseWorkEvent(mexdao); + save(mexdao); _contexts.scheduler.schedulePersistedJob(we.getDetail(), null); } + @Override public InvocationStyle getInvocationStyle() { return InvocationStyle.RELIABLE; } - - + + private WorkEvent generatePartnerResponseWorkEvent(MessageExchangeDAO mexdao) { + WorkEvent we = new WorkEvent(); + we.setProcessId(_process.getPID()); + we.setChannel(mexdao.getChannel()); + we.setIID(_iid); + we.setMexId(_mexId); + we.setType(WorkEvent.Type.PARTNER_RESPONSE); + return we; + } + } Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -28,7 +28,7 @@ assertTransaction(); _process.invokeProcess(getDAO()); - if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE) + if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.ACK) throw new BpelEngineException("Transactional invoke on process did not yield a response."); return Status.valueOf(getDAO().getStatus()); Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -18,8 +18,8 @@ */ public class TransactedPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { - TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { - super(process, mexId, oplink, operation, epr, myRoleEPR, channel); + TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink,Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { + super(process, iid, mexId, oplink, operation, epr, myRoleEPR, channel); } @@ -31,10 +31,8 @@ */ @Override protected void checkReplyContextOk() { - if (!_blocked) - throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!"); - if (!_ownerThread.get()) - throw new BpelEngineException("replyXXX operation attempted from foreign thread!"); + if (_state != State.INVOKE_XXX) + throw new BpelEngineException("replyXXX operation attempted outside of transacted region!"); assert _contexts.isTransacted() : "Internal Error: owner thread must be transactional!?!?!!?"; } @@ -43,6 +41,13 @@ @Override public InvocationStyle getInvocationStyle() { return InvocationStyle.TRANSACTED; + } + + + @Override + protected void asyncACK() { + throw new IllegalStateException("Async responses not supported for transaction invocations."); + } Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -35,17 +35,11 @@ /** * Override the setStatus(...) to notify our future when there is a response/failure. */ - protected void setStatus(Status status) { + protected void ack(AckType acktype) { Status old = getStatus(); - super.setStatus(status); + super.ack(acktype); if (_future != null) { - if (getMessageExchangePattern() == MessageExchangePattern.REQUEST_ONLY) { - if (old == Status.REQUEST && old != status) - _future.done(status); - } else /* two-way */ { - if ((old == Status.ASYNC || old == Status.REQUEST) && status != Status.ASYNC) - _future.done(status); - } + _future.done(Status.ACK); } } @@ -60,7 +54,7 @@ _process.enqueueTransaction(new Callable<Void>() { public Void call() throws Exception { - UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST); + request(); MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); save(dao); if (_process.isInMemory()) Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007 @@ -5,109 +5,87 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.dao.MessageExchangeDAO; -import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction; import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.EndpointReference; import org.apache.ode.bpel.iapi.InvocationStyle; -import org.apache.ode.bpel.iapi.MessageExchangeContext; import org.apache.ode.bpel.iapi.PartnerRoleChannel; import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; -import org.apache.ode.bpel.iapi.MessageExchange.Status; import org.apache.ode.bpel.o.OPartnerLink; /** - * Implementation of the [EMAIL PROTECTED] PartnerRoleMessageExchange} interface that is passed to the IL when the - * UNRELIABLE invocation style is used (see [EMAIL PROTECTED] InvocationStyle#UNRELIABLE}). The basic idea here is - * that with this style, the IL performs the operation outside of a transactional context. It can either - * finish it right away (BLOCK) or indicate that the response will be provided later (replyASYNC). - * - * - * TODO: serious synchronization issues in this class. - * + * Implementation of the [EMAIL PROTECTED] PartnerRoleMessageExchange} interface that is passed to the IL when the UNRELIABLE invocation style + * is used (see [EMAIL PROTECTED] InvocationStyle#UNRELIABLE}). The basic idea here is that with this style, the IL performs the operation + * outside of a transactional context. It can either finish it right away (BLOCK) or indicate that the response will be provided + * later (replyASYNC). + * + * + * * @author Maciej Szefler <mszefler at gmail dot com> - * + * */ public class UnreliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { private static final Log __log = LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class); - + boolean _asyncReply; - UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { - super(process, mexId, oplink, operation, epr, myRoleEPR, channel); + UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation operation, + EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { + super(process, iid, mexId, oplink, operation, epr, myRoleEPR, channel); } - @Override public InvocationStyle getInvocationStyle() { return InvocationStyle.UNRELIABLE; } - @Override - protected void resumeInstance() { + protected void asyncACK() { assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here."; assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense."; - final WorkEvent we = generateInvokeResponseWorkEvent(); if (__log.isDebugEnabled()) { - __log.debug("resumeInstance: scheduling WorkEvent " + we); + __log.debug("asyncResponseReceived: for IID " + getIID() ); } - - doInTX(new InDbAction<Void>() { - - public Void call(MessageExchangeDAO mexdao) { - save(mexdao); - _contexts.scheduler.schedulePersistedJob(we.getDetail(), null); - return null; + _process.scheduleInstanceWork(getIID(), _process._server.new TransactedRunnable(new Runnable() { + public void run() { + MessageExchangeDAO dao = getDAO(); + save(dao); + _process.executeContinueInstancePartnerRoleResponseReceived(dao); } - }); + + })); } @Override protected void checkReplyContextOk() { super.checkReplyContextOk(); - if (!_blocked && getStatus() != Status.ASYNC) - throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!"); - - // Prevent user from attempting the replyXXXX calls while a transaction is active. + // Prevent user from attempting the replyXXXX calls while a transaction is active. if (_contexts.isTransacted()) throw new BpelEngineException("Cannot reply to UNRELIABLE style invocation from a transactional context!"); - } - - @Override public void replyAsync(String foreignKey) { - if (__log.isDebugEnabled()) + if (__log.isDebugEnabled()) __log.debug("replyAsync mex=" + _mexId); - sync(); - - if (!_blocked) - throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. "); - - // TODO: shouldn't this set _blocked? - - checkReplyContextOk(); - setStatus(Status.ASYNC); - _foreignKey = foreignKey; - sync(); - - } - - - /** - * Method used by server to wait until a response is available. - */ - Status waitForResponse() { - // TODO: actually wait for response. - return getStatus(); + _accessLock.lock(); + try { + checkReplyContextOk(); + + if (_state != State.INVOKE_XXX) + throw new IllegalStateException( + "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. "); + + _asyncReply = true; + _foreignKey = foreignKey; + } finally { + _accessLock.unlock(); + } } - + } - Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=563267&r1=563266&r2=563267 ============================================================================== --- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original) +++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Mon Aug 6 13:47:58 2007 @@ -32,6 +32,7 @@ import org.apache.ode.bpel.dao.PartnerLinkDAO; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.iapi.MessageExchange.AckType; import org.w3c.dom.Element; public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO { @@ -61,6 +62,7 @@ private MessageExchangeDAO _pipedExchange; private String _failureType; private long _timeout; + private AckType _ackType; public MessageExchangeDAOImpl(char direction, String messageEchangeId){ this.direction = direction; @@ -299,5 +301,13 @@ public void setTimeout(long timeout) { _timeout = timeout; + } + + public AckType getAckType() { + return _ackType; + } + + public void setAckType(AckType ackType) { + _ackType = ackType; } }
