Author: mszefler
Date: Thu Aug  2 10:26:37 2007
New Revision: 562188

URL: http://svn.apache.org/viewvc?view=rev&rev=562188
Log:
Fix message exchanges status updates, to occur after commit

Added:
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
   (with props)
Modified:
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
 Thu Aug  2 10:26:37 2007
@@ -11,6 +11,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -51,10 +52,14 @@
         if (_future != null)
             return _future;
         
+        if (_request == null)
+            throw new IllegalStateException("Must call setRequest(...)!");
+        
         _future = new ResponseFuture();
         _process.enqueueTransaction(new Callable<Void>() {
 
             public Void call() throws Exception {
+                AsyncMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
                 MessageExchangeDAO dao = 
_process.createMessageExchange(getMessageExchangeId(), 
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
                 save(dao);
                 if (_process.isInMemory()) 
@@ -93,9 +98,7 @@
                 if (_status != null)
                     return _status;
 
-                while (_status == null) {
-                    this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
-                }
+                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
 
                 if (_status == null)
                     throw new TimeoutException();

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 Thu Aug  2 10:26:37 2007
@@ -6,10 +6,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.runtime.PROCESS;
 
 /**
  * Objects used for synchronizing the execution of instance-level work. All 
work on behalf of an instance is funneled to one of
@@ -34,6 +31,8 @@
     private ArrayList<Runnable> _todoQueue = new ArrayList<Runnable>();
 
     private final ThreadLocal<Long> _activeInstance = new ThreadLocal<Long>();
+    
+    private Thread _workerThread;
 
     BpelInstanceWorker(BpelProcess process, Long iid) {
         _process = process;
@@ -51,11 +50,12 @@
      * @param runnable
      */
     synchronized void enqueue(Runnable runnable) {
+        __log.debug("Enqueue work for instance IID " + _iid + ": " + runnable);
         _todoQueue.add(runnable);
         // We mayh need to reschedule this thread if we've dropped out of the 
end of the run() method.
         if (!_running) {
             _running = true;
-            _process.scheduleRunnable(this);
+            _process.enqueueRunnable(this);
         }
     }
  
@@ -74,7 +74,8 @@
      * @throws Exception
      *             forwarded from [EMAIL PROTECTED] Callable#call()}
      */
-    synchronized <T> T execInCurrentThread(Callable<T> callable) throws 
Exception {
+    <T> T execInCurrentThread(Callable<T> callable) throws Exception {
+        __log.debug("Importing thread " + Thread.currentThread() + " for IID " 
+ _iid);
         final Semaphore ready = new Semaphore(0);
         final Semaphore finished = new Semaphore(0);
         enqueue(new Runnable() {
@@ -88,6 +89,8 @@
                 }
             }
         });
+        
+        __log.debug("Blocking main worker thread for IID " + _iid);
         try {
             ready.acquire();
         } catch (InterruptedException ex) {
@@ -95,12 +98,14 @@
             throw new BpelEngineException("Thread interrupted.", ex);
         }
 
+        __log.debug("Executing worker for IID " + _iid + " in imported thread 
" + Thread.currentThread());
         _activeInstance.set(_iid);
         try {
             return callable.call();
         } catch (Exception ex) {
             throw ex;
         } finally {
+            __log.debug("Releasing worker thread for IID " + _iid + " imported 
thread " + Thread.currentThread());
             finished.release();
             _activeInstance.set(null);
         }
@@ -113,7 +118,9 @@
      * Implementation of the [EMAIL PROTECTED] Runnable} interface.
      */
     public void run() {
+        __log.debug("Running worker thread " + Thread.currentThread() + " for 
instance IID " + _iid);
         _activeInstance.set(_iid);
+        _workerThread = Thread.currentThread();
         try {
 
             do {
@@ -123,10 +130,12 @@
                         // This is the only way to drop out of this method 
short of some disasterous error. This is
                         // important since we need to synchronize _running 
with _todoQueue state.
                         _running = false;
+                        __log.debug("Worker thread " + Thread.currentThread() 
+ " for instance IID " + _iid + " ran out of work. ");
                         return;
                     }
 
                     next = _todoQueue.remove(0);
+                    __log.debug("Worker thread "  + Thread.currentThread() + " 
for instance IID " + _iid + " found work: " + next);
                 }
 
                 try {
@@ -137,11 +146,12 @@
             } while (true);
         } finally {
             _activeInstance.set(null);
+            _workerThread = null;
         }
     }
 
     public String toString() {
-        return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + 
_iid + "}";
+        return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + 
_iid + " workerThread="+ _workerThread + "}";
     }
 
     public boolean isWorkerThread() {

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Thu Aug  2 10:26:37 2007
@@ -193,8 +193,9 @@
                 __log.error(errmsg);
                 
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
                 mexdao.setFaultExplanation(errmsg);
+                Status oldstatus = Status.valueOf(mexdao.getStatus());
                 mexdao.setStatus(Status.FAILURE.toString());
-                fireMexStateEvent(mexdao);
+                fireMexStateEvent(mexdao, oldstatus, Status.FAILURE);
                 return;
             }
 
@@ -242,7 +243,7 @@
 
     private void executeCreateInstance(MessageExchangeDAO mexdao) {
         assert _hydrationLatch.isLatched(1);
-        
+
         BpelInstanceWorker worker = 
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
         BpelRuntimeContextImpl instanceCtx = new 
BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
@@ -301,7 +302,7 @@
 
     private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
         assert _hydrationLatch.isLatched(1);
-        
+
         for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : 
_endpointToMyRoleMap.entrySet()) {
             if (e.getKey().serviceName.equals(serviceName))
                 return e.getValue();
@@ -359,7 +360,6 @@
             return null;
     }
 
-    
     /**
      * Process the message-exchange interceptors.
      * 
@@ -697,35 +697,31 @@
         }
     }
 
-    private MyRoleMessageExchangeImpl newMyRoleMex(
-            InvocationStyle istyle, 
-            String mexId, 
-            QName target, 
-            OPartnerLink oplink, 
+    private MyRoleMessageExchangeImpl newMyRoleMex(InvocationStyle istyle, 
String mexId, QName target, OPartnerLink oplink,
             Operation operation) {
         MyRoleMessageExchangeImpl mex;
         switch (istyle) {
         case RELIABLE:
-            mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, 
oplink,operation, target);
+            mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink, 
operation, target);
             break;
         case ASYNC:
-            mex = new AsyncMyRoleMessageExchangeImpl(this, 
mexId,oplink,operation,target);
+            mex = new AsyncMyRoleMessageExchangeImpl(this, mexId, oplink, 
operation, target);
             break;
         case TRANSACTED:
-            mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, 
oplink,operation,target);
+            mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink, 
operation, target);
             break;
         case BLOCKING:
-            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, 
oplink,operation,target);
+            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink, 
operation, target);
             break;
         default:
             throw new AssertionError("Unexpected invocation style: " + istyle);
 
         }
-        
+
         registerMyRoleMex(mex);
         return mex;
     }
-    
+
     MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
         InvocationStyle istyle = 
InvocationStyle.valueOf(mexdao.getInvocationStyle());
 
@@ -733,26 +729,30 @@
         try {
             OPartnerLink plink = (OPartnerLink) 
_oprocess.getChild(mexdao.getPartnerLinkModelId());
             if (plink == null) {
-                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ 
mexdao.getMessageExchangeId() + " referenced unknown pLinkModelId " + 
mexdao.getPartnerLinkModelId());
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #" + 
mexdao.getMessageExchangeId()
+                        + " referenced unknown pLinkModelId " + 
mexdao.getPartnerLinkModelId());
                 __log.error(errmsg);
                 throw new BpelEngineException(errmsg);
             }
-            
-            Operation op  = plink.getMyRoleOperation(mexdao.getOperation());
+
+            Operation op = plink.getMyRoleOperation(mexdao.getOperation());
             if (op == null) {
-                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ 
mexdao.getMessageExchangeId() + " referenced unknown operation " + 
mexdao.getOperation());
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #" + 
mexdao.getMessageExchangeId()
+                        + " referenced unknown operation " + 
mexdao.getOperation());
                 __log.error(errmsg);
-                throw new BpelEngineException(errmsg);                
+                throw new BpelEngineException(errmsg);
             }
-            
+
             PartnerLinkMyRoleImpl myRole = _myRoles.get(plink);
             if (myRole == null) {
-                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ 
mexdao.getMessageExchangeId() + " referenced non-existant myrole");
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #" + 
mexdao.getMessageExchangeId()
+                        + " referenced non-existant myrole");
                 __log.error(errmsg);
-                throw new BpelEngineException(errmsg);                         
       
+                throw new BpelEngineException(errmsg);
             }
-            
-            MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle, 
mexdao.getMessageExchangeId(), myRole._endpoint.serviceName, plink, op);
+
+            MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle, 
mexdao.getMessageExchangeId(), myRole._endpoint.serviceName,
+                    plink, op);
             mex.load(mexdao);
             return mex;
         } finally {
@@ -815,7 +815,7 @@
      */
     private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
         assert _hydrationLatch.isLatched(1);
-        
+
         PartnerLinkMyRoleImpl target = null;
         for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
             if (endpoint.serviceName.equals(serviceName))
@@ -825,9 +825,10 @@
         return target;
 
     }
-    
+
     /**
-     * Used by [EMAIL PROTECTED] BpelRuntimeContextImpl} constructor. Should 
only be called from latched context. 
+     * Used by [EMAIL PROTECTED] BpelRuntimeContextImpl} constructor. Should 
only be called from latched context.
+     * 
      * @return
      */
     ReplacementMap getReplacementMap() {
@@ -1028,6 +1029,13 @@
         _server.scheduleRunnable(new ProcessRunnable(runnable));
     }
 
+    public void enqueueRunnable(BpelInstanceWorker worker) {
+        if (__log.isDebugEnabled())
+            __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+
+        _server.enqueueRunnable(new ProcessRunnable(worker));
+    }
+
     class ProcessRunnable implements Runnable {
         Runnable _work;
 
@@ -1066,11 +1074,9 @@
 
     }
 
-    public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle 
istyle, 
-            final QName targetService,
-            final String operation, 
-            final String clientKey) {
-        
+    public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle 
istyle, final QName targetService,
+            final String operation, final String clientKey) {
+
         final String mexId = new GUID().toString();
         _hydrationLatch.latch(1);
         try {
@@ -1082,8 +1088,8 @@
             if (op == null)
                 throw new BpelEngineException("NoSuchOperation: " + operation);
 
-            return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, 
target._plinkDef, op);  
-            
+            return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, 
target._plinkDef, op);
+
         } finally {
             _hydrationLatch.release(1);
         }
@@ -1092,24 +1098,25 @@
     void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
         _mexStateListeners.add(new 
WeakReference<MyRoleMessageExchangeImpl>(mymex));
     }
-    
+
     void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
         ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new 
ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
-        for (WeakReference<MyRoleMessageExchangeImpl> wref : 
_mexStateListeners) { 
+        for (WeakReference<MyRoleMessageExchangeImpl> wref : 
_mexStateListeners) {
             MyRoleMessageExchangeImpl mex = wref.get();
             if (mex == null || mex == mymex)
                 needsRemoval.add(wref);
         }
         _mexStateListeners.removeAll(needsRemoval);
-            
+
     }
-    
-    void fireMexStateEvent(MessageExchangeDAO mexdao) {
-        for (WeakReference<MyRoleMessageExchangeImpl> wr:  _mexStateListeners) 
{
-            MyRoleMessageExchangeImpl mymex = wr.get();
-            if (mymex != null && mymex.getMessageExchangeId() != null)
-                mymex.onStateChanged(mexdao);
-        }
+
+    void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) 
{
+        if (old != news)
+            for (WeakReference<MyRoleMessageExchangeImpl> wr : 
_mexStateListeners) {
+                MyRoleMessageExchangeImpl mymex = wr.get();
+                if (mymex != null && mymex.getMessageExchangeId() != null)
+                    mymex.onStateChanged(mexdao, old, news);
+            }
 
     }
 

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Thu Aug  2 10:26:37 2007
@@ -153,7 +153,7 @@
         _outstandingRequests = new OutstandingRequestManager();
         _vpu.setContext(_soup);
 
-        byte[] daoState = dao.getExecutionState();
+        byte[] daoState = _bpelProcess.isInMemory() ? null : 
dao.getExecutionState();
         if (daoState != null) {
             assert !_bpelProcess.isInMemory() : "did not expect to rehydrate 
in-mem process!";
             ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
@@ -535,8 +535,10 @@
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
 
+        Status previousStatus = Status.valueOf(myrolemex.getStatus());
         myrolemex.setStatus(status.toString());
-
+        _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, status);
+        
         if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
             MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
 

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 Thu Aug  2 10:26:37 2007
@@ -157,6 +157,18 @@
 
             if (_exec == null)
                 _exec = Executors.newCachedThreadPool();
+
+            if (_contexts.txManager == null) {
+                String errmsg = "Transaction manager not specified; call 
setTransactionManager(...)!";
+                __log.fatal(errmsg);
+                throw new IllegalStateException(errmsg);
+            }
+            
+            if (_contexts.scheduler == null) { 
+                String errmsg = "Scheduler not specified; call 
setScheduler(...)!";
+                __log.fatal(errmsg);
+                throw new IllegalStateException(errmsg);
+            }
             
             _contexts.scheduler.start();
             _state = State.RUNNING;
@@ -305,6 +317,9 @@
 
         try {
             BpelProcess p = _registeredProcesses.remove(pid);
+            if (p == null)
+                return;
+            
             p.deactivate();
             while (_serviceMap.values().remove(p))
                 ;
@@ -598,14 +613,26 @@
         return _exec.submit(new ServerCallable<T>(new 
TransactedCallable<T>(transaction)));
     }
 
+    void enqueueRunnable(final Runnable runnable) {
+        _exec.submit(new ServerRunnable(runnable));
+    }
+    
     /**
      * Schedule a [EMAIL PROTECTED] Runnable} object for execution after the 
completion of the current transaction. 
      * @param runnable
      */
-    void scheduleRunnable(Runnable runnable) {
+    void scheduleRunnable(final Runnable runnable) {
         assertTransaction();
-        _contexts.registerCommitSynchronizer(new ServerRunnable(runnable));
+        _contexts.registerCommitSynchronizer(new Runnable() {
+
+            public void run() {
+                _exec.submit(new ServerRunnable(runnable));
+            }
+            
+        });
+        
     }
+
     
     protected void assertTransaction() {
         if (!_contexts.isTransacted())
@@ -730,6 +757,9 @@
             _mngmtLock.readLock().lock();
             try {
                 return _work.call();
+            } catch (Exception ex) {
+                __log.fatal("Internal Error", ex);
+                throw ex;
             } finally {
                 _mngmtLock.readLock().unlock();
             }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
 Thu Aug  2 10:26:37 2007
@@ -19,6 +19,8 @@
 
 package org.apache.ode.bpel.engine;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.BindingContext;
 import org.apache.ode.bpel.iapi.BpelEngineException;
@@ -42,7 +44,8 @@
  * Aggregation of all the contexts provided to the BPEL engine by the 
integration layer.
  */
 class Contexts {
-
+    private static final Log __log = LogFactory.getLog(Contexts.class);
+    
     TransactionManager txManager;
 
     MessageExchangeContext mexContext;
@@ -104,14 +107,14 @@
                 try {
                     txManager.commit();
                 } catch (Exception ex) {
-                    throw new BpelEngineException("Could not commit.", ex);
+                    __log.error("Commit failed.", ex);                    
+                    throw new BpelEngineException("Commit failed.", ex);
                 }
             else
                 try {
                     txManager.rollback();
                 } catch (Exception ex) {
-                    throw new BpelEngineException("Could not rollback.", ex);
-
+                    __log.error("Transaction rollback failed.", ex);
                 }
         }
     }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 Thu Aug  2 10:26:37 2007
@@ -125,7 +125,7 @@
     /** Names of proprties that have been modified. */
     final HashSet<String> _modifiedProperties = new HashSet<String>();
 
-    private FailureType _failureType;
+    protected FailureType _failureType;
 
     private Set<String> _propNames;
 
@@ -176,12 +176,14 @@
 
         if (_changes.contains(Change.REQUEST)) {
             MessageDAO requestDao = dao.createMessage(_request.getType());
-            requestDao.setData(_request.getMessage());            
+            requestDao.setData(_request.getMessage());   
+            dao.setRequest(requestDao);
         }
         
         if (_changes.contains(Change.RESPONSE)) {
             MessageDAO responseDao = dao.createMessage(_response.getType());
             responseDao.setData(_response.getMessage());
+            dao.setResponse(responseDao);
         }
 
         if (_changes.contains(Change.EPR)) {
@@ -297,40 +299,7 @@
     }
 
     
-    void setFault(QName faultType, Message outputFaultMessage) throws 
BpelEngineException {
-        setStatus(Status.FAULT);
-        _fault = faultType;
-        _response = (MessageImpl) outputFaultMessage;
-
-        _changes.add(Change.RESPONSE);
-    }
-
-    void setFaultExplanation(String explanation) {
-        _explanation = explanation;
-    }
-
-    void setResponse(Message outputMessage) throws BpelEngineException {
-        if (getStatus() != Status.REQUEST && getStatus() != Status.ASYNC)
-            throw new IllegalStateException("Not in REQUEST state!");
-
-        setStatus(Status.RESPONSE);
-        _fault = null;
-        _explanation = null;
-        _response = (MessageImpl) outputMessage;
-        _response.makeReadOnly();
-        _changes.add(Change.RESPONSE);
-
-    }
-
-    void setFailure(FailureType type, String reason, Element details) throws 
BpelEngineException {
-        // TODO not using FailureType, nor details
-        setStatus(Status.FAILURE);
-        _failureType = type;
-        _explanation = reason;
-
-        _changes.add(Change.RESPONSE);
-    }
-
+   
     void setStatus(Status status) {
         _status = status;
     }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 Thu Aug  2 10:26:37 2007
@@ -1,6 +1,5 @@
 package org.apache.ode.bpel.engine;
 
-import java.lang.ref.WeakReference;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
@@ -9,37 +8,37 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import 
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
 import org.apache.ode.bpel.o.OPartnerLink;
+import org.w3c.dom.Element;
 
 abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl 
implements MyRoleMessageExchange {
 
     private static final Log __log = 
LogFactory.getLog(MyRoleMessageExchangeImpl.class);
-    
+
     protected final QName _callee;
 
     protected CorrelationStatus _cstatus;
 
     protected String _clientId;
 
-    public MyRoleMessageExchangeImpl(BpelProcess process,
-            String mexId, 
-            OPartnerLink oplink, 
-            Operation operation,
-            QName callee) {
+    public MyRoleMessageExchangeImpl(BpelProcess process, String mexId, 
OPartnerLink oplink, Operation operation, QName callee) {
         super(process, mexId, oplink, oplink.myRolePortType, operation);
         _callee = callee;
     }
-        
+
     public CorrelationStatus getCorrelationStatus() {
         return _cstatus;
     }
@@ -47,15 +46,16 @@
     @Override
     void load(MessageExchangeDAO dao) {
         super.load(dao);
-        _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
+        _cstatus = dao.getCorrelationStatus() == null ? null : 
CorrelationStatus.valueOf(dao.getCorrelationStatus());
         _clientId = dao.getCorrelationId();
     }
 
     @Override
     public void save(MessageExchangeDAO dao) {
         super.save(dao);
-        dao.setCorrelationStatus(_cstatus.toString());
+        dao.setCorrelationStatus(_cstatus == null ? null : 
_cstatus.toString());
         dao.setCorrelationId(_clientId);
+        dao.setCallee(_callee);
     }
 
     public String getClientId() {
@@ -102,28 +102,27 @@
     }
 
     protected void scheduleInvoke() {
-        
+
         assert !_process.isInMemory() : "Cannot schedule invokes for in-memory 
processes.";
         assert _contexts.isTransacted() : "Cannot schedule outside of 
transaction context.";
-        
+
         // Schedule a new job for invocation
         final WorkEvent we = new WorkEvent();
         we.setType(WorkEvent.Type.MYROLE_INVOKE);
         we.setProcessId(_process.getPID());
         we.setMexId(_mexId);
 
-        // Schedule a timeout 
+        // Schedule a timeout
         final WorkEvent we1 = new WorkEvent();
         we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
         we1.setProcessId(_process.getPID());
         we1.setMexId(_mexId);
-        
+
         setStatus(Status.ASYNC);
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
         _contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
 
     }
-    
 
     /**
      * Process the message-exchange interceptors.
@@ -133,7 +132,7 @@
      * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
      */
     protected boolean processInterceptors(InterceptorInvoker invoker, 
MessageExchangeDAO mexDao) {
-        // TODO: should we give the in-mem dao connection for interceptors on 
in-mem processes? 
+        // TODO: should we give the in-mem dao connection for interceptors on 
in-mem processes?
         InterceptorContextImpl ictx = new 
InterceptorContextImpl(_contexts.dao.getConnection(), mexDao.getProcess(), 
null);
 
         for (MessageExchangeInterceptor i : _contexts.globalIntereceptors)
@@ -150,25 +149,82 @@
             invoker.invoke(i, mex, ictx);
         } catch (FaultMessageExchangeException fme) {
             __log.debug("interceptor " + i + " caused invoke on " + this + " 
to be aborted with FAULT " + fme.getFaultName());
-            mex.setFault(fme.getFaultName(), fme.getFaultData());
+            mex.serverFaulted(fme.getFaultName(), fme.getFaultData());
             return false;
         } catch (AbortMessageExchangeException ame) {
             __log.debug("interceptor " + i + " cause invoke on " + this + " to 
be aborted with FAILURE: " + ame.getMessage());
-            mex.setFailure(MessageExchange.FailureType.ABORTED, 
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
+            mex.serverFailed(MessageExchange.FailureType.ABORTED, 
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
                     .toString(), ame.getMessage()), null);
             return false;
         }
         return true;
     }
 
+    protected void onStateChanged(MessageExchangeDAO mexdao, Status oldstatus, 
final Status newstatus) {
+        MessageDAO response = mexdao.getResponse();
+        switch (newstatus) {
+        case RESPONSE: {
+            final Element msg = response.getData();
+            final QName msgtype = response.getType();
+            _process.scheduleRunnable(new Runnable() {
+                public void run() {
+                    serverResponded(new MemBackedMessageImpl(msg, msgtype, 
true));
+                }
+            });
+        }
+            break;
+        case FAULT: {
+            final QName fault = mexdao.getFault();
+            final Element faultMsg = response.getData();
+            final QName msgtype = response.getType();
+            _process.scheduleRunnable(new Runnable() {
+                public void run() {
+                    serverFaulted(fault, new MemBackedMessageImpl(faultMsg, 
msgtype, true));
+                }
 
-    protected void onStateChanged(MessageExchangeDAO mexdao) {
-        setStatus(Status.valueOf(mexdao.getStatus()));
+            });
+        }
+            break;
+        case FAILURE:
+            final String failureExplanation = mexdao.getFaultExplanation();
+            final FailureType ftype = 
FailureType.valueOf(mexdao.getFailureType());
+            _process.scheduleRunnable(new Runnable() {
+                public void run() {
+                    serverFailed(ftype, failureExplanation, null); // TODO add 
failure detail
+                }
+
+            });
+            break;
+        }
     }
-    
-    
+
     protected void finalize() {
         _process.unregisterMyRoleMex(this);
+    }
+
+    
+    void serverFaulted(QName faultType, Message outputFaultMessage) throws 
BpelEngineException {
+        _fault = faultType;
+        _response = (MessageImpl) outputFaultMessage;
+        setStatus(Status.FAULT);
+    }
+
+   
+    void serverResponded(Message outputMessage) {
+        _fault = null;
+        _explanation = null;
+        _response = (MessageImpl) outputMessage;
+        _response.makeReadOnly();
+        setStatus(Status.RESPONSE);
+
+    }
+
+    void serverFailed(FailureType type, String reason, Element details) {
+        // TODO not using FailureType, nor details
+        _failureType = type;
+        _explanation = reason;
+        setStatus(Status.FAILURE);
+
     }
 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
 Thu Aug  2 10:26:37 2007
@@ -102,7 +102,9 @@
         }
         sync();
         checkReplyContextOk();
-        setFault(faultType, outputFaultMessage);
+        _fault = faultType;
+        _response = (MessageImpl) outputFaultMessage;
+        setStatus(Status.FAULT);
         sync();
         if (!_blocked)
             resumeInstance();
@@ -114,7 +116,8 @@
         }
         sync();
         checkReplyContextOk();
-        setResponse(response);
+        _response = (MessageImpl) response;
+        setStatus(Status.RESPONSE);
         sync();
         if (!_blocked)
             resumeInstance();
@@ -127,7 +130,9 @@
         }
         sync();
         checkReplyContextOk();
-        setFailure(type, description, details);
+        _failureType = type;
+        _explanation = description;
+        setStatus(Status.FAILURE);
         sync();
         if (!_blocked)
             resumeInstance();

Added: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java?view=auto&rev=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
 (added)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
 Thu Aug  2 10:26:37 2007
@@ -0,0 +1,50 @@
+package org.apache.ode.bpel.engine;
+
+import javax.wsdl.Operation;
+
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+
+/**
+ * 
+ * 
+ * @author Maciej Szefler
+ *
+ */
+public class TransactedPartnerRoleMessageExchangeImpl extends 
PartnerRoleMessageExchangeImpl {
+
+    TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, String 
mexId, OPartnerLink oplink, Operation operation,EndpointReference epr, 
EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+        super(process, mexId, oplink, operation,  epr, myRoleEPR, channel);
+    }
+    
+    
+    /**
+     * The criteria for issuing a replyXXX call on TRANSACTED message 
exchanges is that the replyXXX must come while the
+     * engine is blocked in an  
+     * [EMAIL PROTECTED] 
MessageExchangeContext#invokePartnerBlocking(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}.
 
+     * method, AND the call must come from the engine thread. 
+     */
+    @Override
+    protected void checkReplyContextOk() {
+        if (!_blocked)
+            throw new BpelEngineException("replyXXX operation attempted 
outside of BLOCKING region!");
+        if (!_ownerThread.get())
+            throw new BpelEngineException("replyXXX operation attempted from 
foreign thread!");
+        
+        assert _contexts.isTransacted() : "Internal Error: owner thread must 
be transactional!?!?!!?"; 
+    }
+
+
+    @Override
+    public InvocationStyle getInvocationStyle() {
+        return InvocationStyle.TRANSACTED;
+    }
+
+
+
+}

Propchange: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=562188&r1=562187&r2=562188
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 Thu Aug  2 10:26:37 2007
@@ -277,9 +277,12 @@
 
     public void setFailureType(String failureType) {
         _failureType = failureType;
-        
     }
 
+    public String getFailureType() {
+        return _failureType;
+    }
+    
     public void setInvocationStyle(String invocationStyle) {
         _istyle = invocationStyle;
         


Reply via email to