Author: mszefler
Date: Wed Aug 15 10:21:24 2007
New Revision: 566271
URL: http://svn.apache.org/viewvc?view=rev&rev=566271
Log:
BART --
* added caching of myrolemex objects
* returned to reentrant bpelinstanceworker
* fixed broken interceptor code
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
Wed Aug 15 10:21:24 2007
@@ -78,8 +78,9 @@
*/
<T> T execInCurrentThread(Callable<T> callable) throws Exception {
+ // Allow recursive invocations. This allows us to nest P2P invocations
to an arbitrary depth.
if (isWorkerThread())
- throw new BpelEngineException("InternalError: Attempt to reenter
instance worker " + toString());
+ return doInstanceWork(callable);
final Semaphore ready = new Semaphore(0);
final Semaphore finished = new Semaphore(0);
@@ -103,14 +104,10 @@
}
- _activeInstance.set(_iid);
try {
return doInstanceWork(callable);
- } catch (Exception ex) {
- throw ex;
} finally {
finished.release();
- _activeInstance.set(null);
}
}
@@ -172,12 +169,14 @@
*/
private <T> T doInstanceWork(Callable<T> work) throws Exception {
__log.debug("Doing work for instance " + instanceId() +" in thread " +
Thread.currentThread());
+ _activeInstance.set(_iid);
try {
return work.call();
} catch (Exception ex) {
__log.error("Work for instance " + instanceId() + " in thread " +
Thread.currentThread() + " resulted in an exception." ,ex);
throw ex;
} finally {
+ _activeInstance.set(null);
__log.debug("Finished work for instance " + instanceId() + " in
thread " + Thread.currentThread());
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Wed Aug 15 10:21:24 2007
@@ -19,7 +19,6 @@
package org.apache.ode.bpel.engine;
import java.io.InputStream;
-import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -30,7 +29,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import javax.wsdl.Operation;
@@ -60,10 +58,13 @@
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.apache.ode.bpel.intercept.FailMessageExchangeException;
+import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
@@ -145,7 +146,8 @@
final BpelServerImpl _server;
- final private List<WeakReference<MyRoleMessageExchangeImpl>>
_mexStateListeners = new
CopyOnWriteArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+ /** Weak-reference cache of all the my-role message exchange objects. */
+ final private MyRoleMessageExchangeCache _myRoleMexCache = new
MyRoleMessageExchangeCache(this);
BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener
debugger) {
_server = server;
@@ -198,10 +200,7 @@
if (target == null) {
String errmsg =
__msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
__log.error(errmsg);
-
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT);
- mexdao.setFaultExplanation(errmsg);
- mexdao.setStatus(Status.ACK);
- mexdao.setAckType(AckType.FAILURE);
+ MexDaoUtil.setFailed(mexdao,
MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg);
onMyRoleMexAck(mexdao, oldstatus);
return;
}
@@ -210,10 +209,13 @@
if (op == null) {
String errmsg =
__msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
__log.error(errmsg);
-
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION);
- mexdao.setFaultExplanation(errmsg);
- mexdao.setStatus(Status.ACK);
- mexdao.setAckType(AckType.FAILURE);
+ MexDaoUtil.setFailed(mexdao,
MessageExchange.FailureType.UNKNOWN_OPERATION, errmsg);
+ onMyRoleMexAck(mexdao, oldstatus);
+ return;
+ }
+
+ if (!processInterceptors(mexdao,
InterceptorInvoker.__onProcessInvoked)) {
+ __log.debug("Aborting processing of mex " +
mexdao.getMessageExchangeId() + " due to interceptors.");
onMyRoleMexAck(mexdao, oldstatus);
return;
}
@@ -227,12 +229,6 @@
mexdao.setProcess(getProcessDAO());
- // TODO: fix this
- // if (!processInterceptors(mex,
InterceptorInvoker.__onProcessInvoked)) {
- // __log.debug("Aborting processing of mex " + mex + " due to
interceptors.");
- // return;
- // }
-
markused();
CorrelationStatus cstatus = target.invokeMyRole(mexdao);
if (cstatus == null) {
@@ -384,32 +380,11 @@
brc.execute();
}
- void enqueueInstanceWork(Long instanceId, Runnable runnable) {
- BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
- iworker.enqueue(runnable);
- }
-
void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
iworker.enqueue(_server.new TransactedRunnable(runnable));
}
- /**
- * Schedule work for a given instance; work will occur if transaction
commits.
- *
- * @param instanceId
- * @param name
- */
- void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
- _contexts.registerCommitSynchronizer(new Runnable() {
- public void run() {
- BpelInstanceWorker iworker =
_instanceWorkerCache.get(instanceId);
- iworker.enqueue(new ProcessRunnable(runnable));
- }
- });
-
- }
-
private <T> T doInstanceWork(Long instanceId, final Callable<T> callable) {
try {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
@@ -487,16 +462,23 @@
* message exchange
* @return <code>true</code> if execution should continue,
<code>false</code> otherwise
*/
- boolean processInterceptors(MyRoleMessageExchangeImpl mex,
InterceptorInvoker invoker) {
- // InterceptorContextImpl ictx = new
InterceptorContextImpl(_contexts.dao.getConnection(), getProcessDAO(), _pconf);
- //
- // for (MessageExchangeInterceptor i : _mexInterceptors)
- // if (!mex.processInterceptor(i, mex, ictx, invoker))
- // return false;
- // for (MessageExchangeInterceptor i : getEngine().getInterceptors())
- // if (!mex.processInterceptor(i, mex, ictx, invoker))
- // return false;
- //
+ boolean processInterceptors(MessageExchangeDAO mexdao, InterceptorInvoker
invoker) {
+ InterceptorContextImpl ictx = new
InterceptorContextImpl(_contexts.dao.getConnection(), mexdao, getProcessDAO(),
_pconf);
+
+ try {
+ for (MessageExchangeInterceptor interceptor : _mexInterceptors)
+ invoker.invoke(interceptor, ictx);
+
+ for (MessageExchangeInterceptor interceptor :
_server._contexts.globalIntereceptors)
+ invoker.invoke(interceptor, ictx);
+ } catch (FailMessageExchangeException e) {
+ MexDaoUtil.setFailed(mexdao,FailureType.ABORTED, e.getMessage());
+ return false;
+ } catch (FaultMessageExchangeException e) {
+ MexDaoUtil.setFaulted(mexdao, e.getFaultName(), e.getFaultData());
+ return false;
+ }
+
return true;
}
@@ -813,10 +795,28 @@
}
- registerMyRoleMex(mex);
+ _myRoleMexCache.put(mex);
return mex;
}
+ /**
+ * Lookup a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object in the
cache, re-creating it if not found.
+ *
+ * @param mexdao
+ * DB representation of the mex.
+ * @return client representation
+ */
+ MyRoleMessageExchangeImpl lookupMyRoleMex(MessageExchangeDAO mexdao) {
+ return _myRoleMexCache.get(mexdao); // this will re-create if necessary
+ }
+
+ /**
+ * Create (or recreate) a [EMAIL PROTECTED] MyRoleMessageExchangeImpl}
object from data in the db. This method is used by the
+ * [EMAIL PROTECTED] MyRoleMessageExchangeCache} to re-create objects when
they are not found in the cache.
+ *
+ * @param mexdao
+ * @return
+ */
MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
InvocationStyle istyle = mexdao.getInvocationStyle();
@@ -1016,21 +1016,6 @@
}
}
- void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
- _mexStateListeners.add(new
WeakReference<MyRoleMessageExchangeImpl>(mymex));
- }
-
- void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
- ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new
ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
- for (WeakReference<MyRoleMessageExchangeImpl> wref :
_mexStateListeners) {
- MyRoleMessageExchangeImpl mex = wref.get();
- if (mex == null || mex == mymex)
- needsRemoval.add(wref);
- }
- _mexStateListeners.removeAll(needsRemoval);
-
- }
-
void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
@@ -1069,11 +1054,11 @@
caller.p2pWakeup(pmex);
} else /* not p2p */{
- // TODO: force a myrole mex to be created if it is not in cache.
- for (WeakReference<MyRoleMessageExchangeImpl> wr :
_mexStateListeners) {
- MyRoleMessageExchangeImpl mymex = wr.get();
- if (mymex != null && mymex.getMessageExchangeId() != null)
- mymex.onAsyncAck(mexdao);
+ // Do an Async wakeup if we are in the ASYNC state. If we're not,
we'll pick up the ACK when we unwind
+ // the stack.
+ if (old == Status.ASYNC) {
+ MyRoleMessageExchangeImpl mymex = _myRoleMexCache.get(mexdao);
+ mymex.onAsyncAck(mexdao);
}
}
@@ -1241,6 +1226,11 @@
Operation operation =
oplink.getPartnerRoleOperation(mexdao.getOperation());
+ if (!processInterceptors(mexdao,
InterceptorInvoker.__onPartnerInvoked)) {
+ __log.debug("Partner invocation intercepted.");
+ return;
+ }
+
try {
if (p2pProcess != null) {
/* P2P (process-to-process) invocation, special logic */
@@ -1324,10 +1314,9 @@
* @param myrolemex
*/
private void p2pWakeup(final MessageExchangeDAO prolemex) {
- BpelInstanceWorker iworker =
_instanceWorkerCache.get(prolemex.getInstance().getInstanceId());
try {
- iworker.execInCurrentThread(new Callable<Void>() {
+ doInstanceWork(prolemex.getInstance().getInstanceId(), new
Callable<Void>() {
public Void call() throws Exception {
executeContinueInstancePartnerRoleResponseReceived(prolemex);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Wed Aug 15 10:21:24 2007
@@ -535,7 +535,7 @@
case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
return process.createPartnerRoleMex(mexdao);
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
- return process.recreateMyRoleMex(mexdao);
+ return process.lookupMyRoleMex(mexdao);
default:
String errmsg = "BpelEngineImpl: internal error,
invalid MexDAO direction: " + mexId;
__log.fatal(errmsg);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
Wed Aug 15 10:21:24 2007
@@ -19,23 +19,28 @@
package org.apache.ode.bpel.engine;
import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.iapi.ProcessConf;
-import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
/**
- * Implementation of the [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext}
+ * Implementation of the [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent}
* interface.
+ *
* @author Maciej Szefler (m s z e f l e r @ g m a i l . c o m)
*
*/
-public class InterceptorContextImpl implements InterceptorContext{
+class InterceptorContextImpl implements InterceptorEvent{
private ProcessDAO _processDao;
private BpelDAOConnection _connection;
private ProcessConf _pconf;
+ private MessageExchangeDAO _mexdao;
- public InterceptorContextImpl(BpelDAOConnection connection, ProcessDAO
processDAO, ProcessConf pconf) {
+ InterceptorContextImpl(BpelDAOConnection connection, MessageExchangeDAO
mexdao,
+ ProcessDAO processDAO, ProcessConf pconf) {
_connection = connection;
+ _mexdao = mexdao;
_processDao = processDAO;
_pconf = pconf;
}
@@ -50,6 +55,10 @@
public ProcessConf getProcessConf() {
return _pconf;
+ }
+
+ public MessageExchangeDAO getMessageExchangeDAO() {
+ return _mexdao;
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
Wed Aug 15 10:21:24 2007
@@ -4,19 +4,32 @@
import java.util.HashMap;
import java.util.Iterator;
+import javax.wsdl.Operation;
+
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
+
/**
- * Manage [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object references.
+ * Manage [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object references.
*
* @author Maciej Szefler <mszefler at gmail dot com>
- *
+ *
*/
class MyRoleMessageExchangeCache {
-
+
private static final int CLEANUP_PERIOD = 20;
private HashMap<String, WeakReference<MyRoleMessageExchangeImpl>> _cache =
new HashMap<String, WeakReference<MyRoleMessageExchangeImpl>>();
private int _inserts = 0;
+
+ private BpelProcess _process;
+
+ MyRoleMessageExchangeCache(BpelProcess process) {
+ _process = process;
+ }
void put(MyRoleMessageExchangeImpl mex) {
synchronized (this) {
@@ -24,47 +37,48 @@
if (_inserts > CLEANUP_PERIOD) {
cleanup();
}
-
+
WeakReference<MyRoleMessageExchangeImpl> ref =
_cache.get(mex.getMessageExchangeId());
if (ref != null && ref.get() != null)
throw new IllegalStateException("InternalError: duplicate
myrolemex registration!");
-
+
_cache.put(mex.getMessageExchangeId(), new
WeakReference<MyRoleMessageExchangeImpl>(mex));
}
}
-
+
/**
- * Attempt to retrieve a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} for
the given identifier.
- * @param mexId
+ * Retrieve a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} from the cache,
re-creating if necessary.
+ *
+ * @param mexdao
* @return
*/
- MyRoleMessageExchangeImpl get(String mexId) {
- synchronized(this) {
- WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexId);
- if (ref == null)
- return null;
- MyRoleMessageExchangeImpl mex = ref.get();
- if (mex == null)
- _cache.remove(mexId);
+ MyRoleMessageExchangeImpl get(MessageExchangeDAO mexdao) {
+ synchronized (this) {
+ WeakReference<MyRoleMessageExchangeImpl> ref =
_cache.get(mexdao.getMessageExchangeId());
+ MyRoleMessageExchangeImpl mex = ref == null ? null : ref.get();
+
+ if (mex == null) {
+ mex = _process.recreateMyRoleMex(mexdao);
+ _cache.put(mexdao.getMessageExchangeId(), new
WeakReference<MyRoleMessageExchangeImpl>(mex));
+ }
+
return mex;
-
+
}
}
/**
* Remove stale references.
- *
+ *
*/
- void cleanup() {
- synchronized(this){
- for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i =
_cache.values().iterator(); i.hasNext(); ) {
- WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
- if (ref.get() == null)
- i.remove();
- }
-
- _inserts = 0;
+ private void cleanup() {
+ for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i =
_cache.values().iterator(); i.hasNext();) {
+ WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
+ if (ref.get() == null)
+ i.remove();
}
+
+ _inserts = 0;
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Wed Aug 15 10:21:24 2007
@@ -18,7 +18,7 @@
import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
import org.apache.ode.bpel.o.OPartnerLink;
abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl
implements MyRoleMessageExchange {
@@ -128,11 +128,7 @@
__log.debug("invoke() EPR= " + _epr + " ==> " + _process);
try {
- if (!processInterceptors(InterceptorInvoker.__onBpelServerInvoked,
dao)) {
- assert getStatus() == Status.ACK;
- return dao;
- }
-
+
_process.invokeProcess(dao);
} finally {
if (dao.getStatus() == Status.ACK) {
@@ -147,56 +143,8 @@
}
- /**
- * Process the message-exchange interceptors.
- *
- * @param mex
- * message exchange
- * @return <code>true</code> if execution should continue,
<code>false</code> otherwise
- */
- protected boolean processInterceptors(InterceptorInvoker invoker,
MessageExchangeDAO mexDao) {
- // TODO: should we give the in-mem dao connection for interceptors on
in-mem processes?
- InterceptorContextImpl ictx = new
InterceptorContextImpl(_contexts.dao.getConnection(), mexDao.getProcess(),
null);
-
- for (MessageExchangeInterceptor i : _contexts.globalIntereceptors)
- if (!processInterceptor(i, this, ictx, invoker, mexDao))
- return false;
-
- return true;
- }
-
- protected boolean processInterceptor(
- MessageExchangeInterceptor i,
- MyRoleMessageExchangeImpl mex,
- InterceptorContext ictx,
- InterceptorInvoker invoker,
- MessageExchangeDAO mexdao) {
- __log.debug(invoker + "--> interceptor " + i);
- try {
- invoker.invoke(i, mex, ictx);
- } catch (FaultMessageExchangeException fme) {
- __log.debug("interceptor " + i + " caused invoke on " + this + "
to be aborted with FAULT " + fme.getFaultName());
- MexDaoUtil.setFaulted(mexdao, fme.getFaultName(),
fme.getFaultData() == null ? null : fme.getFaultData().getMessage());
- return false;
- } catch (AbortMessageExchangeException ame) {
- __log.debug("interceptor " + i + " cause invoke on " + this + " to
be aborted with FAILURE: " + ame.getMessage());
- MexDaoUtil.setFailed(mexdao, FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
- .toString(), ame.getMessage()));
- return false;
- }
- return true;
- }
-
-
- protected abstract void onAsyncAck(MessageExchangeDAO mexdao);
-
-
- protected void finalize() {
- _process.unregisterMyRoleMex(this);
- }
-
+ protected abstract void onAsyncAck(MessageExchangeDAO mexdao);
-
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
Wed Aug 15 10:21:24 2007
@@ -154,6 +154,7 @@
_response = response;
_fault = fault;
_failureType = failureType;
+ _explanation = explanation;
ack(ackType);
_future.done(Status.ACK);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
Wed Aug 15 10:21:24 2007
@@ -46,14 +46,14 @@
__log.debug("asyncResponseReceived: for IID " + getIID() );
}
- _process.scheduleInstanceWork(getIID(), _process._server.new
TransactedRunnable(new Runnable() {
+ _process.enqueueInstanceTransaction(getIID(), new Runnable() {
public void run() {
MessageExchangeDAO dao = getDAO();
save(dao);
_process.executeContinueInstancePartnerRoleResponseReceived(dao);
}
- }));
+ });
}
@Override
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
Wed Aug 15 10:21:24 2007
@@ -20,7 +20,7 @@
import javax.xml.namespace.QName;
-import org.apache.ode.bpel.iapi.Message;
+import org.w3c.dom.Element;
/**
* Exception thrown by [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -32,9 +32,9 @@
private static final long serialVersionUID = 1L;
private QName _faultName;
- private Message _faultData;
+ private Element _faultData;
- public FaultMessageExchangeException(String errmsg, QName faultName,
Message faultData) {
+ public FaultMessageExchangeException(String errmsg, QName faultName,
Element faultData) {
super(errmsg);
_faultName = faultName;
@@ -45,7 +45,7 @@
return _faultName;
}
- public Message getFaultData() {
+ public Element getFaultData() {
return _faultData;
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
Wed Aug 15 10:21:24 2007
@@ -18,10 +18,7 @@
*/
package org.apache.ode.bpel.intercept;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
/**
* Helper for invoking the appropriate [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -34,35 +31,28 @@
private final String _name;
// Closures anyone?
- /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange,
InterceptorContext)} */
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange,
InterceptorEvent)} */
public static final InterceptorInvoker __onProcessInvoked= new
InterceptorInvoker("onProcessInvoked") {
- public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i,
InterceptorEvent ictx)
throws FailMessageExchangeException,
FaultMessageExchangeException {
- i.onProcessInvoked((MyRoleMessageExchange) mex, ictx);
- }
- };
-
- /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onBpelServerInvoked(MyRoleMessageExchange,
InterceptorContext)} */
- public static final InterceptorInvoker __onBpelServerInvoked = new
InterceptorInvoker("onBpelServerInvoked") {
- public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
- throws FailMessageExchangeException,
FaultMessageExchangeException {
- i.onBpelServerInvoked((MyRoleMessageExchange) mex,
ictx);
+ i.onProcessInvoked(ictx);
}
};
- /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorContext)} */
+
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorEvent)} */
public static final InterceptorInvoker __onPartnerInvoked = new
InterceptorInvoker("onPartnerInvoked") {
- public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i,
InterceptorEvent ictx)
throws FailMessageExchangeException,
FaultMessageExchangeException {
- i.onPartnerInvoked((PartnerRoleMessageExchange) mex,
ictx);
+ i.onPartnerInvoked(ictx);
}
};
- /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorContext)} */
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorEvent)} */
public static final InterceptorInvoker __onNewInstanceInvoked = new
InterceptorInvoker("onNewInstanceInvoked") {
- public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i,
InterceptorEvent ictx)
throws FailMessageExchangeException,
FaultMessageExchangeException {
- i.onNewInstanceInvoked((MyRoleMessageExchange) mex,
ictx);
+ i.onNewInstanceInvoked(ictx);
}
};
@@ -71,7 +61,7 @@
_name = name;
}
- public abstract void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ public abstract void invoke(MessageExchangeInterceptor i,
InterceptorEvent ictx)
throws FailMessageExchangeException,
FaultMessageExchangeException;
public String toString() {
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
Wed Aug 15 10:21:24 2007
@@ -19,14 +19,14 @@
package org.apache.ode.bpel.intercept;
import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
/**
- * Hook into the BPEL server that enables intercepting of message exchange
- * invocation.
+ * Hook into the BPEL server that enables intercepting of parntner/server
invocations. This interface operates at
+ * a level that is a bit lower than the IAPI, as it allows access to internal
engine datastructures. Caution should
+ * be used when implementing interceptors.
*
* @author Maciej Szefler
*
@@ -40,7 +40,7 @@
* @param mex
* message exchange
*/
- void onBpelServerInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onBpelServerInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -50,7 +50,7 @@
* @param mex
* message exchange
*/
- void onProcessInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onProcessInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -61,7 +61,7 @@
* @param mex
* message exchange
*/
- void onNewInstanceInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onNewInstanceInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -71,17 +71,30 @@
* @param mex
* message exchange
*/
- void onPartnerInvoked(PartnerRoleMessageExchange mex, InterceptorContext
ic)
+ void onPartnerInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
- public interface InterceptorContext {
+ /**
+ * Representation of an intercept event.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ public interface InterceptorEvent {
+ /** Get the connection to the data store. */
BpelDAOConnection getConnection();
-
+
+ /** Get the DB representation of the process. */
ProcessDAO getProcessDAO();
+ /** Get the process configuration. */
ProcessConf getProcessConf();
+
+ /** Get the database representation of the message exchange. */
+ MessageExchangeDAO getMessageExchangeDAO();
+
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
Wed Aug 15 10:21:24 2007
@@ -31,25 +31,20 @@
*/
public class NoOpInterceptor implements MessageExchangeInterceptor {
- public void onBpelServerInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws
FailMessageExchangeException,
- FaultMessageExchangeException {
- }
+ public void onBpelServerInvoked(InterceptorEvent ic) throws
FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
- public void onProcessInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws
FailMessageExchangeException,
- FaultMessageExchangeException {
- }
+ public void onNewInstanceInvoked(InterceptorEvent ic) throws
FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
- public void onPartnerInvoked(PartnerRoleMessageExchange mex,
- InterceptorContext ic) throws
FailMessageExchangeException,
- FaultMessageExchangeException {
- }
+ public void onPartnerInvoked(InterceptorEvent ic) throws
FailMessageExchangeException, FaultMessageExchangeException {
- public void onNewInstanceInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws
FailMessageExchangeException,
- FaultMessageExchangeException {
+ }
- }
+ public void onProcessInvoked(InterceptorEvent ic) throws
FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
Wed Aug 15 10:21:24 2007
@@ -18,12 +18,12 @@
*/
package org.apache.ode.bpel.intercept;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.w3c.dom.Node;
-import org.w3c.dom.Text;
+import java.util.Map;
import javax.xml.namespace.QName;
-import java.util.Map;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
/**
* An example of a simple interceptor providing a "throttling" capability -
that is an
@@ -36,8 +36,7 @@
private static final QName PROP_MAX_INSTANCES = new
QName("urn:org.apache.ode.bpel.intercept", "maxInstances");
@Override
- public void onNewInstanceInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws
FailMessageExchangeException {
+ public void onNewInstanceInvoked(InterceptorEvent ic) throws
FailMessageExchangeException {
int maxInstances;
try {
maxInstances =
Integer.valueOf(getSimpleProperty(PROP_MAX_INSTANCES, ic));
@@ -56,7 +55,7 @@
* @param ic interceptor context
* @return value of the property, or <code>null</code> if not set
*/
- private String getSimpleProperty(QName propertyName, InterceptorContext
ic) {
+ private String getSimpleProperty(QName propertyName, InterceptorEvent ic) {
Map<QName, Node> props = ic.getProcessConf().getProperties();
for (Map.Entry<QName, Node> prop : props.entrySet()) {
if (prop.getKey().equals(propertyName))