Author: mszefler
Date: Wed Aug 15 10:21:24 2007
New Revision: 566271

URL: http://svn.apache.org/viewvc?view=rev&rev=566271
Log:
BART -- 
* added caching of myrolemex objects 
* returned to reentrant bpelinstanceworker
* fixed broken interceptor code

Modified:
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 Wed Aug 15 10:21:24 2007
@@ -78,8 +78,9 @@
      */
     <T> T execInCurrentThread(Callable<T> callable) throws Exception {
 
+        // Allow recursive invocations. This allows us to nest P2P invocations 
to an arbitrary depth.
         if (isWorkerThread())
-            throw new BpelEngineException("InternalError: Attempt to reenter 
instance worker " + toString());
+            return doInstanceWork(callable);
         
         final Semaphore ready = new Semaphore(0);
         final Semaphore finished = new Semaphore(0);
@@ -103,14 +104,10 @@
         }
 
 
-        _activeInstance.set(_iid);
         try {
             return doInstanceWork(callable);
-        } catch (Exception ex) {
-            throw ex;
         } finally {
             finished.release();
-            _activeInstance.set(null);
         }
 
     }
@@ -172,12 +169,14 @@
      */
     private <T> T doInstanceWork(Callable<T> work) throws Exception {
         __log.debug("Doing work for instance " + instanceId() +" in thread " + 
Thread.currentThread());
+        _activeInstance.set(_iid);
         try {
             return work.call();
         } catch (Exception ex) {
             __log.error("Work for instance " + instanceId() + " in thread "  + 
Thread.currentThread() + " resulted in an exception." ,ex);
             throw ex;
         } finally {
+            _activeInstance.set(null);
             __log.debug("Finished work for instance " + instanceId() + " in 
thread " + Thread.currentThread());
         }
     }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Wed Aug 15 10:21:24 2007
@@ -19,7 +19,6 @@
 package org.apache.ode.bpel.engine;
 
 import java.io.InputStream;
-import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -30,7 +29,6 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 
 import javax.wsdl.Operation;
@@ -60,10 +58,13 @@
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.apache.ode.bpel.intercept.FailMessageExchangeException;
+import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
@@ -145,7 +146,8 @@
 
     final BpelServerImpl _server;
 
-    final private List<WeakReference<MyRoleMessageExchangeImpl>> 
_mexStateListeners = new 
CopyOnWriteArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+    /** Weak-reference cache of all the my-role message exchange objects. */
+    final private MyRoleMessageExchangeCache _myRoleMexCache = new 
MyRoleMessageExchangeCache(this);
 
     BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener 
debugger) {
         _server = server;
@@ -198,10 +200,7 @@
             if (target == null) {
                 String errmsg = 
__msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
                 __log.error(errmsg);
-                
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT);
-                mexdao.setFaultExplanation(errmsg);
-                mexdao.setStatus(Status.ACK);
-                mexdao.setAckType(AckType.FAILURE);
+                MexDaoUtil.setFailed(mexdao, 
MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg);
                 onMyRoleMexAck(mexdao, oldstatus);
                 return;
             }
@@ -210,10 +209,13 @@
             if (op == null) {
                 String errmsg = 
__msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
                 __log.error(errmsg);
-                
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION);
-                mexdao.setFaultExplanation(errmsg);
-                mexdao.setStatus(Status.ACK);
-                mexdao.setAckType(AckType.FAILURE);
+                MexDaoUtil.setFailed(mexdao, 
MessageExchange.FailureType.UNKNOWN_OPERATION, errmsg);
+                onMyRoleMexAck(mexdao, oldstatus);
+                return;
+            }
+
+            if (!processInterceptors(mexdao, 
InterceptorInvoker.__onProcessInvoked)) {
+                __log.debug("Aborting processing of mex " + 
mexdao.getMessageExchangeId() + " due to interceptors.");
                 onMyRoleMexAck(mexdao, oldstatus);
                 return;
             }
@@ -227,12 +229,6 @@
 
             mexdao.setProcess(getProcessDAO());
 
-            // TODO: fix this
-            // if (!processInterceptors(mex, 
InterceptorInvoker.__onProcessInvoked)) {
-            // __log.debug("Aborting processing of mex " + mex + " due to 
interceptors.");
-            // return;
-            // }
-
             markused();
             CorrelationStatus cstatus = target.invokeMyRole(mexdao);
             if (cstatus == null) {
@@ -384,32 +380,11 @@
         brc.execute();
     }
 
-    void enqueueInstanceWork(Long instanceId, Runnable runnable) {
-        BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
-        iworker.enqueue(runnable);
-    }
-
     void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
         BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
         iworker.enqueue(_server.new TransactedRunnable(runnable));
     }
 
-    /**
-     * Schedule work for a given instance; work will occur if transaction 
commits.
-     * 
-     * @param instanceId
-     * @param name
-     */
-    void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
-        _contexts.registerCommitSynchronizer(new Runnable() {
-            public void run() {
-                BpelInstanceWorker iworker = 
_instanceWorkerCache.get(instanceId);
-                iworker.enqueue(new ProcessRunnable(runnable));
-            }
-        });
-
-    }
-
     private <T> T doInstanceWork(Long instanceId, final Callable<T> callable) {
         try {
             BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
@@ -487,16 +462,23 @@
      *            message exchange
      * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
      */
-    boolean processInterceptors(MyRoleMessageExchangeImpl mex, 
InterceptorInvoker invoker) {
-        // InterceptorContextImpl ictx = new 
InterceptorContextImpl(_contexts.dao.getConnection(), getProcessDAO(), _pconf);
-        //
-        // for (MessageExchangeInterceptor i : _mexInterceptors)
-        // if (!mex.processInterceptor(i, mex, ictx, invoker))
-        // return false;
-        // for (MessageExchangeInterceptor i : getEngine().getInterceptors())
-        // if (!mex.processInterceptor(i, mex, ictx, invoker))
-        // return false;
-        //
+    boolean processInterceptors(MessageExchangeDAO mexdao, InterceptorInvoker 
invoker) {
+        InterceptorContextImpl ictx = new 
InterceptorContextImpl(_contexts.dao.getConnection(), mexdao, getProcessDAO(), 
_pconf);
+
+        try {
+            for (MessageExchangeInterceptor interceptor : _mexInterceptors)
+                invoker.invoke(interceptor, ictx);
+
+            for (MessageExchangeInterceptor interceptor : 
_server._contexts.globalIntereceptors)
+                invoker.invoke(interceptor, ictx);
+        } catch (FailMessageExchangeException e) {
+            MexDaoUtil.setFailed(mexdao,FailureType.ABORTED, e.getMessage());
+            return false;
+        } catch (FaultMessageExchangeException e) {
+            MexDaoUtil.setFaulted(mexdao, e.getFaultName(), e.getFaultData());
+            return false;
+        }
+
         return true;
     }
 
@@ -813,10 +795,28 @@
 
         }
 
-        registerMyRoleMex(mex);
+        _myRoleMexCache.put(mex);
         return mex;
     }
 
+    /**
+     * Lookup a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object in the 
cache, re-creating it if not found.
+     * 
+     * @param mexdao
+     *            DB representation of the mex.
+     * @return client representation
+     */
+    MyRoleMessageExchangeImpl lookupMyRoleMex(MessageExchangeDAO mexdao) {
+        return _myRoleMexCache.get(mexdao); // this will re-create if necessary
+    }
+
+    /**
+     * Create (or recreate) a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} 
object from data in the db. This method is used by the
+     * [EMAIL PROTECTED] MyRoleMessageExchangeCache} to re-create objects when 
they are not found in the cache.
+     * 
+     * @param mexdao
+     * @return
+     */
     MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
         InvocationStyle istyle = mexdao.getInvocationStyle();
 
@@ -1016,21 +1016,6 @@
         }
     }
 
-    void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
-        _mexStateListeners.add(new 
WeakReference<MyRoleMessageExchangeImpl>(mymex));
-    }
-
-    void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
-        ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new 
ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
-        for (WeakReference<MyRoleMessageExchangeImpl> wref : 
_mexStateListeners) {
-            MyRoleMessageExchangeImpl mex = wref.get();
-            if (mex == null || mex == mymex)
-                needsRemoval.add(wref);
-        }
-        _mexStateListeners.removeAll(needsRemoval);
-
-    }
-
     void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
 
         if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
@@ -1069,11 +1054,11 @@
                 caller.p2pWakeup(pmex);
 
         } else /* not p2p */{
-            // TODO: force a myrole mex to be created if it is not in cache.
-            for (WeakReference<MyRoleMessageExchangeImpl> wr : 
_mexStateListeners) {
-                MyRoleMessageExchangeImpl mymex = wr.get();
-                if (mymex != null && mymex.getMessageExchangeId() != null)
-                    mymex.onAsyncAck(mexdao);
+            // Do an Async wakeup if we are in the ASYNC state. If we're not, 
we'll pick up the ACK when we unwind
+            // the stack.
+            if (old == Status.ASYNC) {
+                MyRoleMessageExchangeImpl mymex = _myRoleMexCache.get(mexdao);
+                mymex.onAsyncAck(mexdao);
             }
         }
 
@@ -1241,6 +1226,11 @@
 
         Operation operation = 
oplink.getPartnerRoleOperation(mexdao.getOperation());
 
+        if (!processInterceptors(mexdao, 
InterceptorInvoker.__onPartnerInvoked))  {
+            __log.debug("Partner invocation intercepted.");
+            return;
+        }
+        
         try {
             if (p2pProcess != null) {
                 /* P2P (process-to-process) invocation, special logic */
@@ -1324,10 +1314,9 @@
      * @param myrolemex
      */
     private void p2pWakeup(final MessageExchangeDAO prolemex) {
-        BpelInstanceWorker iworker = 
_instanceWorkerCache.get(prolemex.getInstance().getInstanceId());
 
         try {
-            iworker.execInCurrentThread(new Callable<Void>() {
+            doInstanceWork(prolemex.getInstance().getInstanceId(), new 
Callable<Void>() {
 
                 public Void call() throws Exception {
                     
executeContinueInstancePartnerRoleResponseReceived(prolemex);

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 Wed Aug 15 10:21:24 2007
@@ -535,7 +535,7 @@
                     case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
                         return process.createPartnerRoleMex(mexdao);
                     case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-                        return process.recreateMyRoleMex(mexdao);
+                        return process.lookupMyRoleMex(mexdao);
                     default:
                         String errmsg = "BpelEngineImpl: internal error, 
invalid MexDAO direction: " + mexId;
                         __log.fatal(errmsg);

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
 Wed Aug 15 10:21:24 2007
@@ -19,23 +19,28 @@
 package org.apache.ode.bpel.engine;
 
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.iapi.ProcessConf;
-import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
 
 /**
- * Implementation of the [EMAIL PROTECTED] 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext}
+ * Implementation of the [EMAIL PROTECTED] 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent}
  * interface.
+ * 
  * @author Maciej Szefler (m s z e f l e r @ g m a i l . c o m)
  *
  */
-public class InterceptorContextImpl implements InterceptorContext{
+class InterceptorContextImpl implements InterceptorEvent{
     private ProcessDAO _processDao;
     private BpelDAOConnection _connection;
     private ProcessConf _pconf;
+    private MessageExchangeDAO _mexdao;
 
-    public InterceptorContextImpl(BpelDAOConnection connection, ProcessDAO 
processDAO, ProcessConf pconf) {
+    InterceptorContextImpl(BpelDAOConnection connection, MessageExchangeDAO 
mexdao,
+            ProcessDAO processDAO, ProcessConf pconf) {
         _connection = connection;
+        _mexdao = mexdao;
         _processDao = processDAO;
         _pconf = pconf;
     }
@@ -50,6 +55,10 @@
 
     public ProcessConf getProcessConf() {
         return _pconf;
+    }
+
+    public MessageExchangeDAO getMessageExchangeDAO() {
+        return _mexdao;
     }
 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
 Wed Aug 15 10:21:24 2007
@@ -4,19 +4,32 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import javax.wsdl.Operation;
+
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
+
 /**
- * Manage [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object references. 
+ * Manage [EMAIL PROTECTED] MyRoleMessageExchangeImpl} object references.
  * 
  * @author Maciej Szefler <mszefler at gmail dot com>
- *
+ * 
  */
 class MyRoleMessageExchangeCache {
-    
+
     private static final int CLEANUP_PERIOD = 20;
 
     private HashMap<String, WeakReference<MyRoleMessageExchangeImpl>> _cache = 
new HashMap<String, WeakReference<MyRoleMessageExchangeImpl>>();
 
     private int _inserts = 0;
+
+    private BpelProcess _process;
+
+    MyRoleMessageExchangeCache(BpelProcess process) {
+        _process = process;
+    }
     
     void put(MyRoleMessageExchangeImpl mex) {
         synchronized (this) {
@@ -24,47 +37,48 @@
             if (_inserts > CLEANUP_PERIOD) {
                 cleanup();
             }
-                
+
             WeakReference<MyRoleMessageExchangeImpl> ref = 
_cache.get(mex.getMessageExchangeId());
             if (ref != null && ref.get() != null)
                 throw new IllegalStateException("InternalError: duplicate 
myrolemex registration!");
-            
+
             _cache.put(mex.getMessageExchangeId(), new 
WeakReference<MyRoleMessageExchangeImpl>(mex));
         }
     }
-    
+
     /**
-     * Attempt to retrieve a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} for 
the given identifier.
-     * @param mexId
+     * Retrieve a [EMAIL PROTECTED] MyRoleMessageExchangeImpl} from the cache, 
re-creating if necessary.
+     * 
+     * @param mexdao
      * @return
      */
-    MyRoleMessageExchangeImpl get(String mexId) {
-        synchronized(this) {
-            WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexId);
-            if (ref == null)
-                return null;
-            MyRoleMessageExchangeImpl mex = ref.get();
-            if (mex == null)
-                _cache.remove(mexId);
+    MyRoleMessageExchangeImpl get(MessageExchangeDAO mexdao) {
+        synchronized (this) {
+            WeakReference<MyRoleMessageExchangeImpl> ref = 
_cache.get(mexdao.getMessageExchangeId());
+            MyRoleMessageExchangeImpl mex = ref == null ? null : ref.get();
+
+            if (mex == null) {
+                mex = _process.recreateMyRoleMex(mexdao);
+                _cache.put(mexdao.getMessageExchangeId(), new 
WeakReference<MyRoleMessageExchangeImpl>(mex));
+            }
+                
             return mex;
-        
+
         }
 
     }
 
     /**
      * Remove stale references.
-     *
+     * 
      */
-    void cleanup() {
-        synchronized(this){
-            for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i = 
_cache.values().iterator(); i.hasNext(); ) {
-                WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
-                if (ref.get() == null)
-                    i.remove();
-            }
-            
-            _inserts = 0;
+    private void cleanup() {
+        for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i = 
_cache.values().iterator(); i.hasNext();) {
+            WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
+            if (ref.get() == null)
+                i.remove();
         }
+
+        _inserts = 0;
     }
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 Wed Aug 15 10:21:24 2007
@@ -18,7 +18,7 @@
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
 import org.apache.ode.bpel.o.OPartnerLink;
 
 abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl 
implements MyRoleMessageExchange {
@@ -128,11 +128,7 @@
             __log.debug("invoke() EPR= " + _epr + " ==> " + _process);
         
         try {
-            if (!processInterceptors(InterceptorInvoker.__onBpelServerInvoked, 
dao)) {
-                assert getStatus() == Status.ACK;
-                return dao;
-            }
-
+         
             _process.invokeProcess(dao);
         } finally {
             if (dao.getStatus() == Status.ACK) {
@@ -147,56 +143,8 @@
         
     }
 
-    /**
-     * Process the message-exchange interceptors.
-     * 
-     * @param mex
-     *            message exchange
-     * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
-     */
-    protected boolean processInterceptors(InterceptorInvoker invoker, 
MessageExchangeDAO mexDao) {
-        // TODO: should we give the in-mem dao connection for interceptors on 
in-mem processes?
-        InterceptorContextImpl ictx = new 
InterceptorContextImpl(_contexts.dao.getConnection(), mexDao.getProcess(), 
null);
-
-        for (MessageExchangeInterceptor i : _contexts.globalIntereceptors)
-            if (!processInterceptor(i, this, ictx, invoker, mexDao))
-                return false;
-
-        return true;
-    }
-
-    protected boolean processInterceptor(
-            MessageExchangeInterceptor i, 
-            MyRoleMessageExchangeImpl mex, 
-            InterceptorContext ictx,
-            InterceptorInvoker invoker,
-            MessageExchangeDAO mexdao) {
-        __log.debug(invoker + "--> interceptor " + i);
-        try {
-            invoker.invoke(i, mex, ictx);
-        } catch (FaultMessageExchangeException fme) {
-            __log.debug("interceptor " + i + " caused invoke on " + this + " 
to be aborted with FAULT " + fme.getFaultName());
-            MexDaoUtil.setFaulted(mexdao, fme.getFaultName(), 
fme.getFaultData() == null ? null : fme.getFaultData().getMessage());
-            return false;
-        } catch (AbortMessageExchangeException ame) {
-            __log.debug("interceptor " + i + " cause invoke on " + this + " to 
be aborted with FAILURE: " + ame.getMessage());
-            MexDaoUtil.setFailed(mexdao, FailureType.ABORTED, 
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
-                    .toString(), ame.getMessage()));
-            return false;
-        }
-        return true;
-    }
 
-   
-    
-    protected abstract void onAsyncAck(MessageExchangeDAO mexdao);
-    
-    
-    protected void finalize() {
-        _process.unregisterMyRoleMex(this);
-    }
-    
     
+    protected abstract void onAsyncAck(MessageExchangeDAO mexdao);    
 
- 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 Wed Aug 15 10:21:24 2007
@@ -154,6 +154,7 @@
                 _response = response;
                 _fault = fault;
                 _failureType = failureType;
+                _explanation = explanation;
                 ack(ackType);
                 _future.done(Status.ACK);
 

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
 Wed Aug 15 10:21:24 2007
@@ -46,14 +46,14 @@
             __log.debug("asyncResponseReceived: for IID " + getIID() );
         }
 
-        _process.scheduleInstanceWork(getIID(), _process._server.new 
TransactedRunnable(new Runnable() {
+        _process.enqueueInstanceTransaction(getIID(), new Runnable() {
             public void run() {
                 MessageExchangeDAO dao = getDAO();
                 save(dao);
                 
_process.executeContinueInstancePartnerRoleResponseReceived(dao);
             }
 
-        }));
+        });
     }
 
     @Override

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
 Wed Aug 15 10:21:24 2007
@@ -20,7 +20,7 @@
 
 import javax.xml.namespace.QName;
 
-import org.apache.ode.bpel.iapi.Message;
+import org.w3c.dom.Element;
 
 /**
  * Exception thrown by [EMAIL PROTECTED] 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -32,9 +32,9 @@
        private static final long serialVersionUID = 1L;
 
        private QName _faultName;
-       private Message _faultData;
+       private Element _faultData;
 
-       public FaultMessageExchangeException(String errmsg, QName faultName, 
Message faultData) {
+       public FaultMessageExchangeException(String errmsg, QName faultName, 
Element faultData) {
                super(errmsg);
                
                _faultName = faultName;
@@ -45,7 +45,7 @@
                return _faultName;
        }
        
-       public Message getFaultData() {
+       public Element getFaultData() {
                return _faultData;
        }
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
 Wed Aug 15 10:21:24 2007
@@ -18,10 +18,7 @@
  */
 package org.apache.ode.bpel.intercept;
 
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
 
 /**
  * Helper for invoking the appropriate [EMAIL PROTECTED] 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -34,35 +31,28 @@
        private final String _name;
        // Closures anyone? 
        
-       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, 
InterceptorContext)} */
+       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, 
InterceptorEvent)} */
        public static final InterceptorInvoker  __onProcessInvoked= new 
InterceptorInvoker("onProcessInvoked") {
-               public void invoke(MessageExchangeInterceptor i, 
MessageExchange mex, InterceptorContext ictx) 
+               public void invoke(MessageExchangeInterceptor i, 
InterceptorEvent ictx) 
                        throws FailMessageExchangeException, 
FaultMessageExchangeException {
-                       i.onProcessInvoked((MyRoleMessageExchange) mex, ictx);
-               }
-       };
-       
-       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onBpelServerInvoked(MyRoleMessageExchange, 
InterceptorContext)} */
-       public static final InterceptorInvoker __onBpelServerInvoked = new 
InterceptorInvoker("onBpelServerInvoked") {
-               public void invoke(MessageExchangeInterceptor i, 
MessageExchange mex, InterceptorContext ictx) 
-                       throws FailMessageExchangeException, 
FaultMessageExchangeException {
-                       i.onBpelServerInvoked((MyRoleMessageExchange) mex, 
ictx);
+                       i.onProcessInvoked(ictx);
                }
        };
 
-       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, 
InterceptorContext)} */
+
+       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, 
InterceptorEvent)} */
        public static final InterceptorInvoker __onPartnerInvoked = new 
InterceptorInvoker("onPartnerInvoked") {
-               public void invoke(MessageExchangeInterceptor i, 
MessageExchange mex, InterceptorContext ictx) 
+               public void invoke(MessageExchangeInterceptor i, 
InterceptorEvent ictx) 
                        throws FailMessageExchangeException, 
FaultMessageExchangeException {
-                       i.onPartnerInvoked((PartnerRoleMessageExchange) mex, 
ictx);
+                       i.onPartnerInvoked(ictx);
                }
        };
 
-       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, 
InterceptorContext)} */
+       /** Invoke [EMAIL PROTECTED] 
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, 
InterceptorEvent)} */
        public static final InterceptorInvoker __onNewInstanceInvoked = new 
InterceptorInvoker("onNewInstanceInvoked") {
-               public void invoke(MessageExchangeInterceptor i, 
MessageExchange mex, InterceptorContext ictx) 
+               public void invoke(MessageExchangeInterceptor i, 
InterceptorEvent ictx) 
                        throws FailMessageExchangeException, 
FaultMessageExchangeException {
-                       i.onNewInstanceInvoked((MyRoleMessageExchange) mex, 
ictx);
+                       i.onNewInstanceInvoked(ictx);
                }
        };
 
@@ -71,7 +61,7 @@
                _name = name;
        }
        
-       public abstract void invoke(MessageExchangeInterceptor i, 
MessageExchange mex, InterceptorContext ictx)
+       public abstract void invoke(MessageExchangeInterceptor i, 
InterceptorEvent ictx)
                throws FailMessageExchangeException, 
FaultMessageExchangeException;
        
        public String toString() {

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
 Wed Aug 15 10:21:24 2007
@@ -19,14 +19,14 @@
 package org.apache.ode.bpel.intercept;
 
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 
 /**
- * Hook into the BPEL server that enables intercepting of message exchange
- * invocation.
+ * Hook into the BPEL server that enables intercepting of parntner/server 
invocations. This interface operates at 
+ * a level that is a bit lower than the IAPI, as it allows access to internal 
engine datastructures. Caution should
+ * be used when implementing interceptors. 
  * 
  * @author Maciej Szefler
  * 
@@ -40,7 +40,7 @@
      * @param mex
      *            message exchange
      */
-    void onBpelServerInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onBpelServerInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -50,7 +50,7 @@
      * @param mex
      *            message exchange
      */
-    void onProcessInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onProcessInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -61,7 +61,7 @@
      * @param mex
      *            message exchange
      */
-    void onNewInstanceInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onNewInstanceInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -71,17 +71,30 @@
      * @param mex
      *            message exchange
      */
-    void onPartnerInvoked(PartnerRoleMessageExchange mex, InterceptorContext 
ic)
+    void onPartnerInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
 
-    public interface InterceptorContext {
+    /**
+     * Representation of an intercept event. 
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     *
+     */
+    public interface InterceptorEvent {
 
+        /** Get the connection to the data store. */
         BpelDAOConnection getConnection();
-
+        
+        /** Get the DB representation of the process. */
         ProcessDAO getProcessDAO();
 
+        /** Get the process configuration. */
         ProcessConf getProcessConf();
+        
+        /** Get the database representation of the message exchange. */
+        MessageExchangeDAO getMessageExchangeDAO();
+        
 
     }
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
 Wed Aug 15 10:21:24 2007
@@ -31,25 +31,20 @@
  */
 public class NoOpInterceptor implements MessageExchangeInterceptor {
 
-       public void onBpelServerInvoked(MyRoleMessageExchange mex,
-                       InterceptorContext ic) throws 
FailMessageExchangeException,
-                       FaultMessageExchangeException {
-       }
+    public void onBpelServerInvoked(InterceptorEvent ic) throws 
FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
 
-       public void onProcessInvoked(MyRoleMessageExchange mex,
-                       InterceptorContext ic) throws 
FailMessageExchangeException,
-                       FaultMessageExchangeException {
-       }
+    public void onNewInstanceInvoked(InterceptorEvent ic) throws 
FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
 
-       public void onPartnerInvoked(PartnerRoleMessageExchange mex,
-                       InterceptorContext ic) throws 
FailMessageExchangeException,
-                       FaultMessageExchangeException {
-       }
+    public void onPartnerInvoked(InterceptorEvent ic) throws 
FailMessageExchangeException, FaultMessageExchangeException {
 
-       public void onNewInstanceInvoked(MyRoleMessageExchange mex,
-                       InterceptorContext ic) throws 
FailMessageExchangeException,
-                       FaultMessageExchangeException {
+    }
 
-       }
+    public void onProcessInvoked(InterceptorEvent ic) throws 
FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?view=diff&rev=566271&r1=566270&r2=566271
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
 Wed Aug 15 10:21:24 2007
@@ -18,12 +18,12 @@
  */
 package org.apache.ode.bpel.intercept;
 
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.w3c.dom.Node;
-import org.w3c.dom.Text;
+import java.util.Map;
 
 import javax.xml.namespace.QName;
-import java.util.Map;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
 
 /**
  * An example of a  simple interceptor providing a "throttling"  capability - 
that is an 
@@ -36,8 +36,7 @@
     private static final QName PROP_MAX_INSTANCES = new 
QName("urn:org.apache.ode.bpel.intercept", "maxInstances");
 
     @Override
-    public void onNewInstanceInvoked(MyRoleMessageExchange mex,
-                                     InterceptorContext ic) throws 
FailMessageExchangeException {
+    public void onNewInstanceInvoked(InterceptorEvent ic) throws 
FailMessageExchangeException {
         int maxInstances;
         try {
             maxInstances = 
Integer.valueOf(getSimpleProperty(PROP_MAX_INSTANCES, ic));
@@ -56,7 +55,7 @@
      * @param ic interceptor context
      * @return value of the property, or <code>null</code> if not set
      */
-    private String getSimpleProperty(QName propertyName, InterceptorContext 
ic) {
+    private String getSimpleProperty(QName propertyName, InterceptorEvent ic) {
         Map<QName, Node> props =  ic.getProcessConf().getProperties();
         for (Map.Entry<QName, Node> prop : props.entrySet()) {
             if (prop.getKey().equals(propertyName))


Reply via email to