Author: mszefler
Date: Thu Aug 2 10:26:37 2007
New Revision: 562188
URL: http://svn.apache.org/viewvc?view=rev&rev=562188
Log:
Fix message exchanges status updates, to occur after commit
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
(with props)
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
Thu Aug 2 10:26:37 2007
@@ -11,6 +11,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -51,10 +52,14 @@
if (_future != null)
return _future;
+ if (_request == null)
+ throw new IllegalStateException("Must call setRequest(...)!");
+
_future = new ResponseFuture();
_process.enqueueTransaction(new Callable<Void>() {
public Void call() throws Exception {
+ AsyncMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
MessageExchangeDAO dao =
_process.createMessageExchange(getMessageExchangeId(),
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
if (_process.isInMemory())
@@ -93,9 +98,7 @@
if (_status != null)
return _status;
- while (_status == null) {
- this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
- }
+ this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
if (_status == null)
throw new TimeoutException();
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
Thu Aug 2 10:26:37 2007
@@ -6,10 +6,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.runtime.PROCESS;
/**
* Objects used for synchronizing the execution of instance-level work. All
work on behalf of an instance is funneled to one of
@@ -34,6 +31,8 @@
private ArrayList<Runnable> _todoQueue = new ArrayList<Runnable>();
private final ThreadLocal<Long> _activeInstance = new ThreadLocal<Long>();
+
+ private Thread _workerThread;
BpelInstanceWorker(BpelProcess process, Long iid) {
_process = process;
@@ -51,11 +50,12 @@
* @param runnable
*/
synchronized void enqueue(Runnable runnable) {
+ __log.debug("Enqueue work for instance IID " + _iid + ": " + runnable);
_todoQueue.add(runnable);
// We mayh need to reschedule this thread if we've dropped out of the
end of the run() method.
if (!_running) {
_running = true;
- _process.scheduleRunnable(this);
+ _process.enqueueRunnable(this);
}
}
@@ -74,7 +74,8 @@
* @throws Exception
* forwarded from [EMAIL PROTECTED] Callable#call()}
*/
- synchronized <T> T execInCurrentThread(Callable<T> callable) throws
Exception {
+ <T> T execInCurrentThread(Callable<T> callable) throws Exception {
+ __log.debug("Importing thread " + Thread.currentThread() + " for IID "
+ _iid);
final Semaphore ready = new Semaphore(0);
final Semaphore finished = new Semaphore(0);
enqueue(new Runnable() {
@@ -88,6 +89,8 @@
}
}
});
+
+ __log.debug("Blocking main worker thread for IID " + _iid);
try {
ready.acquire();
} catch (InterruptedException ex) {
@@ -95,12 +98,14 @@
throw new BpelEngineException("Thread interrupted.", ex);
}
+ __log.debug("Executing worker for IID " + _iid + " in imported thread
" + Thread.currentThread());
_activeInstance.set(_iid);
try {
return callable.call();
} catch (Exception ex) {
throw ex;
} finally {
+ __log.debug("Releasing worker thread for IID " + _iid + " imported
thread " + Thread.currentThread());
finished.release();
_activeInstance.set(null);
}
@@ -113,7 +118,9 @@
* Implementation of the [EMAIL PROTECTED] Runnable} interface.
*/
public void run() {
+ __log.debug("Running worker thread " + Thread.currentThread() + " for
instance IID " + _iid);
_activeInstance.set(_iid);
+ _workerThread = Thread.currentThread();
try {
do {
@@ -123,10 +130,12 @@
// This is the only way to drop out of this method
short of some disasterous error. This is
// important since we need to synchronize _running
with _todoQueue state.
_running = false;
+ __log.debug("Worker thread " + Thread.currentThread()
+ " for instance IID " + _iid + " ran out of work. ");
return;
}
next = _todoQueue.remove(0);
+ __log.debug("Worker thread " + Thread.currentThread() + "
for instance IID " + _iid + " found work: " + next);
}
try {
@@ -137,11 +146,12 @@
} while (true);
} finally {
_activeInstance.set(null);
+ _workerThread = null;
}
}
public String toString() {
- return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" +
_iid + "}";
+ return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" +
_iid + " workerThread="+ _workerThread + "}";
}
public boolean isWorkerThread() {
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Thu Aug 2 10:26:37 2007
@@ -193,8 +193,9 @@
__log.error(errmsg);
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
mexdao.setFaultExplanation(errmsg);
+ Status oldstatus = Status.valueOf(mexdao.getStatus());
mexdao.setStatus(Status.FAILURE.toString());
- fireMexStateEvent(mexdao);
+ fireMexStateEvent(mexdao, oldstatus, Status.FAILURE);
return;
}
@@ -242,7 +243,7 @@
private void executeCreateInstance(MessageExchangeDAO mexdao) {
assert _hydrationLatch.isLatched(1);
-
+
BpelInstanceWorker worker =
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
BpelRuntimeContextImpl instanceCtx = new
BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
@@ -301,7 +302,7 @@
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
assert _hydrationLatch.isLatched(1);
-
+
for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e :
_endpointToMyRoleMap.entrySet()) {
if (e.getKey().serviceName.equals(serviceName))
return e.getValue();
@@ -359,7 +360,6 @@
return null;
}
-
/**
* Process the message-exchange interceptors.
*
@@ -697,35 +697,31 @@
}
}
- private MyRoleMessageExchangeImpl newMyRoleMex(
- InvocationStyle istyle,
- String mexId,
- QName target,
- OPartnerLink oplink,
+ private MyRoleMessageExchangeImpl newMyRoleMex(InvocationStyle istyle,
String mexId, QName target, OPartnerLink oplink,
Operation operation) {
MyRoleMessageExchangeImpl mex;
switch (istyle) {
case RELIABLE:
- mex = new ReliableMyRoleMessageExchangeImpl(this, mexId,
oplink,operation, target);
+ mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
case ASYNC:
- mex = new AsyncMyRoleMessageExchangeImpl(this,
mexId,oplink,operation,target);
+ mex = new AsyncMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
case TRANSACTED:
- mex = new TransactedMyRoleMessageExchangeImpl(this, mexId,
oplink,operation,target);
+ mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
case BLOCKING:
- mex = new BlockingMyRoleMessageExchangeImpl(this, mexId,
oplink,operation,target);
+ mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink,
operation, target);
break;
default:
throw new AssertionError("Unexpected invocation style: " + istyle);
}
-
+
registerMyRoleMex(mex);
return mex;
}
-
+
MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
InvocationStyle istyle =
InvocationStyle.valueOf(mexdao.getInvocationStyle());
@@ -733,26 +729,30 @@
try {
OPartnerLink plink = (OPartnerLink)
_oprocess.getChild(mexdao.getPartnerLinkModelId());
if (plink == null) {
- String errmsg = __msgs.msgDbConsistencyError("MexDao #"+
mexdao.getMessageExchangeId() + " referenced unknown pLinkModelId " +
mexdao.getPartnerLinkModelId());
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #" +
mexdao.getMessageExchangeId()
+ + " referenced unknown pLinkModelId " +
mexdao.getPartnerLinkModelId());
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
-
- Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+
+ Operation op = plink.getMyRoleOperation(mexdao.getOperation());
if (op == null) {
- String errmsg = __msgs.msgDbConsistencyError("MexDao #"+
mexdao.getMessageExchangeId() + " referenced unknown operation " +
mexdao.getOperation());
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #" +
mexdao.getMessageExchangeId()
+ + " referenced unknown operation " +
mexdao.getOperation());
__log.error(errmsg);
- throw new BpelEngineException(errmsg);
+ throw new BpelEngineException(errmsg);
}
-
+
PartnerLinkMyRoleImpl myRole = _myRoles.get(plink);
if (myRole == null) {
- String errmsg = __msgs.msgDbConsistencyError("MexDao #"+
mexdao.getMessageExchangeId() + " referenced non-existant myrole");
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #" +
mexdao.getMessageExchangeId()
+ + " referenced non-existant myrole");
__log.error(errmsg);
- throw new BpelEngineException(errmsg);
+ throw new BpelEngineException(errmsg);
}
-
- MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle,
mexdao.getMessageExchangeId(), myRole._endpoint.serviceName, plink, op);
+
+ MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle,
mexdao.getMessageExchangeId(), myRole._endpoint.serviceName,
+ plink, op);
mex.load(mexdao);
return mex;
} finally {
@@ -815,7 +815,7 @@
*/
private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
assert _hydrationLatch.isLatched(1);
-
+
PartnerLinkMyRoleImpl target = null;
for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
if (endpoint.serviceName.equals(serviceName))
@@ -825,9 +825,10 @@
return target;
}
-
+
/**
- * Used by [EMAIL PROTECTED] BpelRuntimeContextImpl} constructor. Should
only be called from latched context.
+ * Used by [EMAIL PROTECTED] BpelRuntimeContextImpl} constructor. Should
only be called from latched context.
+ *
* @return
*/
ReplacementMap getReplacementMap() {
@@ -1028,6 +1029,13 @@
_server.scheduleRunnable(new ProcessRunnable(runnable));
}
+ public void enqueueRunnable(BpelInstanceWorker worker) {
+ if (__log.isDebugEnabled())
+ __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+
+ _server.enqueueRunnable(new ProcessRunnable(worker));
+ }
+
class ProcessRunnable implements Runnable {
Runnable _work;
@@ -1066,11 +1074,9 @@
}
- public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle
istyle,
- final QName targetService,
- final String operation,
- final String clientKey) {
-
+ public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle
istyle, final QName targetService,
+ final String operation, final String clientKey) {
+
final String mexId = new GUID().toString();
_hydrationLatch.latch(1);
try {
@@ -1082,8 +1088,8 @@
if (op == null)
throw new BpelEngineException("NoSuchOperation: " + operation);
- return newMyRoleMex(istyle, mexId, target._endpoint.serviceName,
target._plinkDef, op);
-
+ return newMyRoleMex(istyle, mexId, target._endpoint.serviceName,
target._plinkDef, op);
+
} finally {
_hydrationLatch.release(1);
}
@@ -1092,24 +1098,25 @@
void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
_mexStateListeners.add(new
WeakReference<MyRoleMessageExchangeImpl>(mymex));
}
-
+
void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new
ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
- for (WeakReference<MyRoleMessageExchangeImpl> wref :
_mexStateListeners) {
+ for (WeakReference<MyRoleMessageExchangeImpl> wref :
_mexStateListeners) {
MyRoleMessageExchangeImpl mex = wref.get();
if (mex == null || mex == mymex)
needsRemoval.add(wref);
}
_mexStateListeners.removeAll(needsRemoval);
-
+
}
-
- void fireMexStateEvent(MessageExchangeDAO mexdao) {
- for (WeakReference<MyRoleMessageExchangeImpl> wr: _mexStateListeners)
{
- MyRoleMessageExchangeImpl mymex = wr.get();
- if (mymex != null && mymex.getMessageExchangeId() != null)
- mymex.onStateChanged(mexdao);
- }
+
+ void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news)
{
+ if (old != news)
+ for (WeakReference<MyRoleMessageExchangeImpl> wr :
_mexStateListeners) {
+ MyRoleMessageExchangeImpl mymex = wr.get();
+ if (mymex != null && mymex.getMessageExchangeId() != null)
+ mymex.onStateChanged(mexdao, old, news);
+ }
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Thu Aug 2 10:26:37 2007
@@ -153,7 +153,7 @@
_outstandingRequests = new OutstandingRequestManager();
_vpu.setContext(_soup);
- byte[] daoState = dao.getExecutionState();
+ byte[] daoState = _bpelProcess.isInMemory() ? null :
dao.getExecutionState();
if (daoState != null) {
assert !_bpelProcess.isInMemory() : "did not expect to rehydrate
in-mem process!";
ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
@@ -535,8 +535,10 @@
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
}
+ Status previousStatus = Status.valueOf(myrolemex.getStatus());
myrolemex.setStatus(status.toString());
-
+ _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, status);
+
if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Thu Aug 2 10:26:37 2007
@@ -157,6 +157,18 @@
if (_exec == null)
_exec = Executors.newCachedThreadPool();
+
+ if (_contexts.txManager == null) {
+ String errmsg = "Transaction manager not specified; call
setTransactionManager(...)!";
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+
+ if (_contexts.scheduler == null) {
+ String errmsg = "Scheduler not specified; call
setScheduler(...)!";
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
_contexts.scheduler.start();
_state = State.RUNNING;
@@ -305,6 +317,9 @@
try {
BpelProcess p = _registeredProcesses.remove(pid);
+ if (p == null)
+ return;
+
p.deactivate();
while (_serviceMap.values().remove(p))
;
@@ -598,14 +613,26 @@
return _exec.submit(new ServerCallable<T>(new
TransactedCallable<T>(transaction)));
}
+ void enqueueRunnable(final Runnable runnable) {
+ _exec.submit(new ServerRunnable(runnable));
+ }
+
/**
* Schedule a [EMAIL PROTECTED] Runnable} object for execution after the
completion of the current transaction.
* @param runnable
*/
- void scheduleRunnable(Runnable runnable) {
+ void scheduleRunnable(final Runnable runnable) {
assertTransaction();
- _contexts.registerCommitSynchronizer(new ServerRunnable(runnable));
+ _contexts.registerCommitSynchronizer(new Runnable() {
+
+ public void run() {
+ _exec.submit(new ServerRunnable(runnable));
+ }
+
+ });
+
}
+
protected void assertTransaction() {
if (!_contexts.isTransacted())
@@ -730,6 +757,9 @@
_mngmtLock.readLock().lock();
try {
return _work.call();
+ } catch (Exception ex) {
+ __log.fatal("Internal Error", ex);
+ throw ex;
} finally {
_mngmtLock.readLock().unlock();
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
Thu Aug 2 10:26:37 2007
@@ -19,6 +19,8 @@
package org.apache.ode.bpel.engine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.BindingContext;
import org.apache.ode.bpel.iapi.BpelEngineException;
@@ -42,7 +44,8 @@
* Aggregation of all the contexts provided to the BPEL engine by the
integration layer.
*/
class Contexts {
-
+ private static final Log __log = LogFactory.getLog(Contexts.class);
+
TransactionManager txManager;
MessageExchangeContext mexContext;
@@ -104,14 +107,14 @@
try {
txManager.commit();
} catch (Exception ex) {
- throw new BpelEngineException("Could not commit.", ex);
+ __log.error("Commit failed.", ex);
+ throw new BpelEngineException("Commit failed.", ex);
}
else
try {
txManager.rollback();
} catch (Exception ex) {
- throw new BpelEngineException("Could not rollback.", ex);
-
+ __log.error("Transaction rollback failed.", ex);
}
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Thu Aug 2 10:26:37 2007
@@ -125,7 +125,7 @@
/** Names of proprties that have been modified. */
final HashSet<String> _modifiedProperties = new HashSet<String>();
- private FailureType _failureType;
+ protected FailureType _failureType;
private Set<String> _propNames;
@@ -176,12 +176,14 @@
if (_changes.contains(Change.REQUEST)) {
MessageDAO requestDao = dao.createMessage(_request.getType());
- requestDao.setData(_request.getMessage());
+ requestDao.setData(_request.getMessage());
+ dao.setRequest(requestDao);
}
if (_changes.contains(Change.RESPONSE)) {
MessageDAO responseDao = dao.createMessage(_response.getType());
responseDao.setData(_response.getMessage());
+ dao.setResponse(responseDao);
}
if (_changes.contains(Change.EPR)) {
@@ -297,40 +299,7 @@
}
- void setFault(QName faultType, Message outputFaultMessage) throws
BpelEngineException {
- setStatus(Status.FAULT);
- _fault = faultType;
- _response = (MessageImpl) outputFaultMessage;
-
- _changes.add(Change.RESPONSE);
- }
-
- void setFaultExplanation(String explanation) {
- _explanation = explanation;
- }
-
- void setResponse(Message outputMessage) throws BpelEngineException {
- if (getStatus() != Status.REQUEST && getStatus() != Status.ASYNC)
- throw new IllegalStateException("Not in REQUEST state!");
-
- setStatus(Status.RESPONSE);
- _fault = null;
- _explanation = null;
- _response = (MessageImpl) outputMessage;
- _response.makeReadOnly();
- _changes.add(Change.RESPONSE);
-
- }
-
- void setFailure(FailureType type, String reason, Element details) throws
BpelEngineException {
- // TODO not using FailureType, nor details
- setStatus(Status.FAILURE);
- _failureType = type;
- _explanation = reason;
-
- _changes.add(Change.RESPONSE);
- }
-
+
void setStatus(Status status) {
_status = status;
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Thu Aug 2 10:26:37 2007
@@ -1,6 +1,5 @@
package org.apache.ode.bpel.engine;
-import java.lang.ref.WeakReference;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -9,37 +8,37 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
import org.apache.ode.bpel.o.OPartnerLink;
+import org.w3c.dom.Element;
abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl
implements MyRoleMessageExchange {
private static final Log __log =
LogFactory.getLog(MyRoleMessageExchangeImpl.class);
-
+
protected final QName _callee;
protected CorrelationStatus _cstatus;
protected String _clientId;
- public MyRoleMessageExchangeImpl(BpelProcess process,
- String mexId,
- OPartnerLink oplink,
- Operation operation,
- QName callee) {
+ public MyRoleMessageExchangeImpl(BpelProcess process, String mexId,
OPartnerLink oplink, Operation operation, QName callee) {
super(process, mexId, oplink, oplink.myRolePortType, operation);
_callee = callee;
}
-
+
public CorrelationStatus getCorrelationStatus() {
return _cstatus;
}
@@ -47,15 +46,16 @@
@Override
void load(MessageExchangeDAO dao) {
super.load(dao);
- _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
+ _cstatus = dao.getCorrelationStatus() == null ? null :
CorrelationStatus.valueOf(dao.getCorrelationStatus());
_clientId = dao.getCorrelationId();
}
@Override
public void save(MessageExchangeDAO dao) {
super.save(dao);
- dao.setCorrelationStatus(_cstatus.toString());
+ dao.setCorrelationStatus(_cstatus == null ? null :
_cstatus.toString());
dao.setCorrelationId(_clientId);
+ dao.setCallee(_callee);
}
public String getClientId() {
@@ -102,28 +102,27 @@
}
protected void scheduleInvoke() {
-
+
assert !_process.isInMemory() : "Cannot schedule invokes for in-memory
processes.";
assert _contexts.isTransacted() : "Cannot schedule outside of
transaction context.";
-
+
// Schedule a new job for invocation
final WorkEvent we = new WorkEvent();
we.setType(WorkEvent.Type.MYROLE_INVOKE);
we.setProcessId(_process.getPID());
we.setMexId(_mexId);
- // Schedule a timeout
+ // Schedule a timeout
final WorkEvent we1 = new WorkEvent();
we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
we1.setProcessId(_process.getPID());
we1.setMexId(_mexId);
-
+
setStatus(Status.ASYNC);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
_contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
}
-
/**
* Process the message-exchange interceptors.
@@ -133,7 +132,7 @@
* @return <code>true</code> if execution should continue,
<code>false</code> otherwise
*/
protected boolean processInterceptors(InterceptorInvoker invoker,
MessageExchangeDAO mexDao) {
- // TODO: should we give the in-mem dao connection for interceptors on
in-mem processes?
+ // TODO: should we give the in-mem dao connection for interceptors on
in-mem processes?
InterceptorContextImpl ictx = new
InterceptorContextImpl(_contexts.dao.getConnection(), mexDao.getProcess(),
null);
for (MessageExchangeInterceptor i : _contexts.globalIntereceptors)
@@ -150,25 +149,82 @@
invoker.invoke(i, mex, ictx);
} catch (FaultMessageExchangeException fme) {
__log.debug("interceptor " + i + " caused invoke on " + this + "
to be aborted with FAULT " + fme.getFaultName());
- mex.setFault(fme.getFaultName(), fme.getFaultData());
+ mex.serverFaulted(fme.getFaultName(), fme.getFaultData());
return false;
} catch (AbortMessageExchangeException ame) {
__log.debug("interceptor " + i + " cause invoke on " + this + " to
be aborted with FAILURE: " + ame.getMessage());
- mex.setFailure(MessageExchange.FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
+ mex.serverFailed(MessageExchange.FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
.toString(), ame.getMessage()), null);
return false;
}
return true;
}
+ protected void onStateChanged(MessageExchangeDAO mexdao, Status oldstatus,
final Status newstatus) {
+ MessageDAO response = mexdao.getResponse();
+ switch (newstatus) {
+ case RESPONSE: {
+ final Element msg = response.getData();
+ final QName msgtype = response.getType();
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverResponded(new MemBackedMessageImpl(msg, msgtype,
true));
+ }
+ });
+ }
+ break;
+ case FAULT: {
+ final QName fault = mexdao.getFault();
+ final Element faultMsg = response.getData();
+ final QName msgtype = response.getType();
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverFaulted(fault, new MemBackedMessageImpl(faultMsg,
msgtype, true));
+ }
- protected void onStateChanged(MessageExchangeDAO mexdao) {
- setStatus(Status.valueOf(mexdao.getStatus()));
+ });
+ }
+ break;
+ case FAILURE:
+ final String failureExplanation = mexdao.getFaultExplanation();
+ final FailureType ftype =
FailureType.valueOf(mexdao.getFailureType());
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverFailed(ftype, failureExplanation, null); // TODO add
failure detail
+ }
+
+ });
+ break;
+ }
}
-
-
+
protected void finalize() {
_process.unregisterMyRoleMex(this);
+ }
+
+
+ void serverFaulted(QName faultType, Message outputFaultMessage) throws
BpelEngineException {
+ _fault = faultType;
+ _response = (MessageImpl) outputFaultMessage;
+ setStatus(Status.FAULT);
+ }
+
+
+ void serverResponded(Message outputMessage) {
+ _fault = null;
+ _explanation = null;
+ _response = (MessageImpl) outputMessage;
+ _response.makeReadOnly();
+ setStatus(Status.RESPONSE);
+
+ }
+
+ void serverFailed(FailureType type, String reason, Element details) {
+ // TODO not using FailureType, nor details
+ _failureType = type;
+ _explanation = reason;
+ setStatus(Status.FAILURE);
+
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Thu Aug 2 10:26:37 2007
@@ -102,7 +102,9 @@
}
sync();
checkReplyContextOk();
- setFault(faultType, outputFaultMessage);
+ _fault = faultType;
+ _response = (MessageImpl) outputFaultMessage;
+ setStatus(Status.FAULT);
sync();
if (!_blocked)
resumeInstance();
@@ -114,7 +116,8 @@
}
sync();
checkReplyContextOk();
- setResponse(response);
+ _response = (MessageImpl) response;
+ setStatus(Status.RESPONSE);
sync();
if (!_blocked)
resumeInstance();
@@ -127,7 +130,9 @@
}
sync();
checkReplyContextOk();
- setFailure(type, description, details);
+ _failureType = type;
+ _explanation = description;
+ setStatus(Status.FAILURE);
sync();
if (!_blocked)
resumeInstance();
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java?view=auto&rev=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
(added)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
Thu Aug 2 10:26:37 2007
@@ -0,0 +1,50 @@
+package org.apache.ode.bpel.engine;
+
+import javax.wsdl.Operation;
+
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+
+/**
+ *
+ *
+ * @author Maciej Szefler
+ *
+ */
+public class TransactedPartnerRoleMessageExchangeImpl extends
PartnerRoleMessageExchangeImpl {
+
+ TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, String
mexId, OPartnerLink oplink, Operation operation,EndpointReference epr,
EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
+ }
+
+
+ /**
+ * The criteria for issuing a replyXXX call on TRANSACTED message
exchanges is that the replyXXX must come while the
+ * engine is blocked in an
+ * [EMAIL PROTECTED]
MessageExchangeContext#invokePartnerBlocking(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}.
+ * method, AND the call must come from the engine thread.
+ */
+ @Override
+ protected void checkReplyContextOk() {
+ if (!_blocked)
+ throw new BpelEngineException("replyXXX operation attempted
outside of BLOCKING region!");
+ if (!_ownerThread.get())
+ throw new BpelEngineException("replyXXX operation attempted from
foreign thread!");
+
+ assert _contexts.isTransacted() : "Internal Error: owner thread must
be transactional!?!?!!?";
+ }
+
+
+ @Override
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.TRANSACTED;
+ }
+
+
+
+}
Propchange:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
Thu Aug 2 10:26:37 2007
@@ -277,9 +277,12 @@
public void setFailureType(String failureType) {
_failureType = failureType;
-
}
+ public String getFailureType() {
+ return _failureType;
+ }
+
public void setInvocationStyle(String invocationStyle) {
_istyle = invocationStyle;