Author: mszefler
Date: Tue Aug  7 11:06:05 2007
New Revision: 563599

URL: http://svn.apache.org/viewvc?view=rev&rev=563599
Log:
BART tweaks: open up scheduling for in-memory processes to assist testing.

Modified:
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
 Tue Aug  7 11:06:05 2007
@@ -34,6 +34,8 @@
     
     private Thread _workerThread;
 
+    private CachedState _cachedState;
+
     BpelInstanceWorker(BpelProcess process, Long iid) {
         _process = process;
         _iid = iid;
@@ -154,8 +156,29 @@
         return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + 
_iid + " workerThread="+ _workerThread + "}";
     }
 
-    public boolean isWorkerThread() {
+    boolean isWorkerThread() {
         return _activeInstance.get() != null;
+    }
+
+    Object getCachedState(Object uuid) {
+        CachedState cs = _cachedState;
+        if (cs != null && cs.uuid.equals(uuid))
+            return cs.state;
+        return null;
+    }
+    
+    void setCachedState(Object uuid, Object state) {
+        _cachedState = new CachedState(uuid, state);
+    }
+    
+    private class CachedState {
+        final Object uuid;
+        final Object state;
+        
+        CachedState(Object uuid, Object state) {
+            this.uuid = uuid;
+            this.state = state;
+        }
     }
 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Tue Aug  7 11:06:05 2007
@@ -22,6 +22,7 @@
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -67,6 +68,7 @@
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.Serializer;
 import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
 import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
 import org.apache.ode.bpel.runtime.channels.FaultData;
@@ -274,7 +276,7 @@
         BpelInstanceWorker worker = 
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
 
-        BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, 
mexdao.getInstance(), null, null);
+        BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, 
mexdao.getInstance());
         int amp = mexdao.getChannel().indexOf('&');
         String groupId = mexdao.getChannel().substring(0, amp);
         int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
@@ -287,10 +289,7 @@
         BpelInstanceWorker worker = 
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
 
-//      TODO: we need a way to check if the lastBRC is indeed the lastBRC 
(serial number on the instanceDAO)
-//        BpelRuntimeContextImpl brc = lastBRC == null ? new 
BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null)
-//        : new BpelRuntimeContextImpl(worker, mexdao.getInstance(), lastBRC);
-        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, 
mexdao.getInstance(), null, null);
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, 
mexdao.getInstance());
 
         brc.injectPartnerResponse(mexdao.getMessageExchangeId(), 
mexdao.getChannel());
         brc.execute();
@@ -480,7 +479,7 @@
             return;
         }
 
-        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, 
instanceDAO, null, null);
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, 
instanceDAO);
         switch (we.getType()) {
         case TIMER:
             if (__log.isDebugEnabled()) {
@@ -1137,6 +1136,13 @@
         } finally {
             _hydrationLatch.release(1);
         }
+    }
+
+    public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+//        if (isInMemory())
+//            throw new InvalidProcessException("In-mem process execution 
resulted in event scheduling.");
+        
+        _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
     }
 
 }

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Tue Aug  7 11:06:05 2007
@@ -131,29 +131,22 @@
 
     private boolean _executed;
 
-    /**
-     * Construct a BRC using the soup from the previous BRC. This is handy as 
it allows us to eliminate the DB read of the soup,
-     * when we know the soup has not changed since the last TX.
-     * 
-     * @param instanceWorker
-     * @param instanceDao
-     * @param lastBRC
-     */
-    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
ProcessInstanceDAO instanceDao, BpelRuntimeContextImpl lastBRC) {
-        this(instanceWorker, instanceDao, lastBRC._soup);
-    }
-
-    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
ProcessInstanceDAO dao, PROCESS PROCESS,
-            MessageExchangeDAO instantiatingMessageExchange) {
-
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
ProcessInstanceDAO dao) {
         this(instanceWorker, dao, new ExecutionQueueImpl(null));
-        _instantiatingMessageExchange = instantiatingMessageExchange;
 
-        _soup.setReplacementMap(_bpelProcess.getReplacementMap());
-        _soup.setGlobalData(new OutstandingRequestManager());
-        byte[] daoState = _bpelProcess.isInMemory() ? null : 
dao.getExecutionState();
-        if (daoState != null) {
-            assert !_bpelProcess.isInMemory() : "did not expect to rehydrate 
in-mem process!";
+        // The following allows us to skip deserialization of the soup if our 
execution state in memory is the same
+        // as that in the database.
+        Object cachedState = 
instanceWorker.getCachedState(dao.getExecutionStateCounter());
+        if (cachedState != null) {
+            if (__log.isDebugEnabled())
+                __log.debug("CACHE HIT: Using cached state #" + 
dao.getExecutionStateCounter() + " to resume instance " + dao.getInstanceId());
+            _soup = (ExecutionQueueImpl) cachedState; 
+            _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+            _vpu.setContext(_soup);
+        } else {
+            if (__log.isDebugEnabled())
+                __log.debug("CACHE MISS: Loading state to resume instance " + 
dao.getInstanceId() + " from database ");
+            byte[] daoState = dao.getExecutionState();
             ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
             try {
                 _soup.read(iis);
@@ -161,9 +154,20 @@
                 throw new RuntimeException(ex);
             }
         }
-        if (PROCESS != null) {
-            _vpu.inject(PROCESS);
-        }
+    }
+
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
ProcessInstanceDAO dao, PROCESS PROCESS,
+            MessageExchangeDAO instantiatingMessageExchange) {
+
+        this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+        if (PROCESS == null)
+            throw new NullPointerException();
+        if (instantiatingMessageExchange == null)
+            throw new NullPointerException();
+        _soup.setGlobalData(new OutstandingRequestManager());
+        _instantiatingMessageExchange = instantiatingMessageExchange;
+        _vpu.inject(PROCESS);
 
     }
 
@@ -176,6 +180,7 @@
         _vpu = new JacobVPU();
         _vpu.registerExtension(BpelRuntimeContext.class, this);
         _soup = soup;
+        _soup.setReplacementMap(_bpelProcess.getReplacementMap());
         _vpu.setContext(_soup);
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("BpelRuntimeContextImpl created for instance " + _iid 
+ ". INDEXED STATE=" + _soup.getIndex());
@@ -320,9 +325,6 @@
             _dao.setState(ProcessState.STATE_READY);
             evt.setNewState(ProcessState.STATE_READY);
             sendEvent(evt);
-        } else if (_bpelProcess.isInMemory()) {
-            // This condition should be detected with static analysis, but 
just in case.
-            throw new InvalidProcessException("In-memory process must not 
receive additional messages.");
         }
 
         final String pickResponseChannelStr = pickResponseChannel.export();
@@ -660,30 +662,23 @@
     }
 
     public void registerTimer(TimerResponseChannel timerChannel, Date 
timeToFire) {
-
-        if (_bpelProcess.isInMemory())
-            throw new InvalidProcessException("Process not compatible with 
in-memory execution.");
-
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
         we.setProcessId(_bpelProcess.getPID());
         we.setChannel(timerChannel.export());
         we.setType(WorkEvent.Type.TIMER);
-        _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+        _bpelProcess.scheduleWorkEvent(we, timeToFire);
     }
 
     private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey 
key) {
 
-        if (_bpelProcess.isInMemory())
-            throw new InvalidProcessException("Process not compatible with 
in-memory execution.");
-
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
         we.setProcessId(_bpelProcess.getPID());
         we.setType(WorkEvent.Type.MATCHER);
         we.setCorrelatorId(correlatorId);
         we.setCorrelationKey(key);
-        _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+        _bpelProcess.scheduleWorkEvent(we, null);
     }
 
     public String invoke(PartnerLinkInstance partnerLink, Operation operation, 
Element outgoingMessage,
@@ -893,34 +888,13 @@
 
         _dao.setLastActiveTime(new Date());
         if (!ProcessState.isFinished(_dao.getState())) {
-            if (__log.isDebugEnabled())
-                __log.debug("Setting execution state on instance " + _iid);
-            _soup.setGlobalData(getORM());
-
-            if (_bpelProcess.isInMemory()) {
-                // don't serialize in-memory processes
-                ((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
-            } else {
-                ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
-                try {
-                    _soup.write(bos);
-                    bos.close();
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-                _dao.setExecutionState(bos.toByteArray());
-            }
+            saveState();
 
             if (ProcessState.canExecute(_dao.getState()) && canReduce) {
                 // Max time exceeded (possibly an infinite loop).
                 if (__log.isDebugEnabled())
                     __log.debug("MaxTime exceeded for instance # " + _iid);
 
-                // NOTE: we never ever schedule anything for in-mem processes, 
they have to finish in a single
-                // go.
-                if (_bpelProcess.isInMemory())
-                    throw new BpelEngineException("In-memory process 
timeout.");
-
                 try {
                     WorkEvent we = new WorkEvent();
                     we.setIID(_iid);
@@ -935,6 +909,21 @@
         }
     }
 
+    private void saveState() {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
+        try {
+            _soup.write(bos);
+            bos.close();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        int newcount = _dao.getExecutionStateCounter() + 1;
+        _dao.setExecutionStateCounter(newcount);
+        _dao.setExecutionState(bos.toByteArray());
+        _instanceWorker.setCachedState(newcount, _soup);
+    }
+
     void inputMsgMatch(final String responsechannel, final int idx, 
MessageExchangeDAO mexdao) {
         // if we have a message match, this instance should be marked
         // active if it isn't already
@@ -951,7 +940,6 @@
             evt.setNewState(ProcessState.STATE_ACTIVE);
             sendEvent(evt);
         }
-        
 
         getORM().associate(responsechannel, mexdao.getMessageExchangeId());
 
@@ -1357,7 +1345,6 @@
     }
 
     private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
-        assert !_bpelProcess.isInMemory() : "Internal error; attempt to 
schedule in-memory process";
         assert _contexts.isTransacted();
 
         WorkEvent we = new WorkEvent();
@@ -1365,7 +1352,7 @@
         we.setMexId(messageExchange.getMessageExchangeId());
         we.setProcessId(_bpelProcess.getPID());
         we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
-        _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+        _bpelProcess.scheduleWorkEvent(we, null);
     }
 
     /**

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 Tue Aug  7 11:06:05 2007
@@ -37,7 +37,6 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
@@ -112,7 +111,7 @@
     private volatile int _syncdummy;
 
     enum Change {
-        EPR, RESPONSE, RELEASE, REQUEST
+        EPR, ACK, RELEASE, REQUEST
     }
 
     final HashSet<Change> _changes = new HashSet<Change>();
@@ -180,19 +179,9 @@
         dao.setFailureType(_failureType == null ? null : 
_failureType.toString());
         dao.setAckType(_ackType);
 
-        if (_changes.contains(Change.REQUEST)) {
-            MessageDAO requestDao = dao.createMessage(_request.getType());
-            requestDao.setData(_request.getMessage());   
-            dao.setRequest(requestDao);
-        }
-        
-        if (_changes.contains(Change.RESPONSE)) {
-            MessageDAO responseDao = dao.createMessage(_response.getType());
-            responseDao.setData(_response.getMessage());
-            dao.setResponse(responseDao);
-        }
-
+       
         if (_changes.contains(Change.EPR)) {
+            _changes.remove(_epr);
             if (_epr != null)
                 dao.setEPR(_epr.toXML().getDocumentElement());
             else

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 Tue Aug  7 11:06:05 2007
@@ -10,6 +10,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MessageExchangeImpl.Change;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
@@ -54,6 +55,14 @@
         dao.setCorrelationStatus(_cstatus == null ? null : 
_cstatus.toString());
         dao.setCorrelationId(_clientId);
         dao.setCallee(_callee);
+        
+        if (_changes.contains(Change.REQUEST)) {
+            _changes.remove(Change.REQUEST);
+            MessageDAO requestDao = dao.createMessage(_request.getType());
+            requestDao.setData(_request.getMessage());   
+            dao.setRequest(requestDao);
+        }
+        
     }
 
     public String getClientId() {

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
 Tue Aug  7 11:06:05 2007
@@ -180,6 +180,7 @@
                     .getInstance().getInstanceId(), 
mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
                     _channel);
 
+
             // Need to cheat a little bit for in-memory processes; do the 
invoke in-line, but first suspend
             // the transaction so that the IL does not get confused.
             Transaction tx;
@@ -189,15 +190,22 @@
             } catch (Exception ex) {
                 throw new BpelEngineException("TxManager Error: cannot 
suspend!", ex);
             }
-
+            
+            unreliableMex.request();
+            unreliableMex.setState(State.INVOKE_XXX);
             try {
-                unreliableMex.setState(State.INVOKE_XXX);
-                _contexts.mexContext.invokePartnerUnreliable(unreliableMex);
+                try {
+                    
_contexts.mexContext.invokePartnerUnreliable(unreliableMex);
+                } catch (Throwable t) {
+                    __log.error("Unexpected error invoking partner." ,t);
+                    MexDaoUtil.setFailed(mexDao, FailureType.OTHER, 
t.toString());
+                    return;
+                }
+                
                 try {
                     unreliableMex.waitForAck(mexDao.getTimeout());
                 } catch (InterruptedException ie) {
                     __log.warn("Interrupted waiting for MEX response.");
-
                 }
 
             } finally {
@@ -226,6 +234,8 @@
         ReliablePartnerRoleMessageExchangeImpl reliableMex = new 
ReliablePartnerRoleMessageExchangeImpl(_process, mexDao
                 .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), 
_plinkDef, operation, partnerEpr, myRoleEpr,
                 _channel);
+        
+        reliableMex.request();
         reliableMex.setState(State.INVOKE_XXX);
         Throwable err = null;
         try {
@@ -255,6 +265,8 @@
         TransactedPartnerRoleMessageExchangeImpl transactedMex = new 
TransactedPartnerRoleMessageExchangeImpl(_process, mexDao
                 .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), 
_plinkDef, operation, partnerEpr, myRoleEpr,
                 _channel);
+        
+        transactedMex.request();
         transactedMex.setState(State.INVOKE_XXX);
         try {
             _contexts.mexContext.invokePartnerTransacted(transactedMex);

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
 Tue Aug  7 11:06:05 2007
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
@@ -38,6 +39,7 @@
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.xmlbeans.XmlCursor.ChangeStamp;
 import org.w3c.dom.Element;
 
 import com.sun.corba.se.spi.activation._ActivatorImplBase;
@@ -98,6 +100,15 @@
     @Override
     void save(MessageExchangeDAO dao) {
         super.save(dao);
+        
+        if (_changes.contains(Change.ACK)) {
+            _changes.remove(Change.ACK);
+            MessageDAO responseDao = dao.createMessage(_response.getType());
+            responseDao.setData(_response.getMessage());
+            dao.setResponse(responseDao);
+        }
+
+
     }
 
     @Override
@@ -105,6 +116,7 @@
         _accessLock.lock();
         try {
             super.ack(acktype);
+            _changes.add(Change.ACK);
             _acked.signalAll();
         } finally {
             _accessLock.unlock();
@@ -234,7 +246,7 @@
             throw new IllegalStateException("Object used in inappropriate 
context. ");
 
         if (getStatus() != MessageExchange.Status.REQ)
-            throw new IllegalStateException("Invalid message exchange state, 
expect REQUEST or ASYNC, but got " + getStatus());
+            throw new IllegalStateException("Invalid message exchange state, 
expect REQ but got " + getStatus());
 
     }
 

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 Tue Aug  7 11:06:05 2007
@@ -17,7 +17,7 @@
 import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
- * For invoking the engine using ASYNC style.
+ * For invoking the engine using UNRELIABLE style.
  * 
  * @author Maciej Szefler <mszefler at gmail dot com>
  * 
@@ -36,7 +36,6 @@
      * Override the setStatus(...) to notify our future when there is a 
response/failure.
      */
     protected void ack(AckType acktype) {
-        Status old = getStatus();
         super.ack(acktype);
         if (_future != null) {
             _future.done(Status.ACK);

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
 Tue Aug  7 11:06:05 2007
@@ -45,33 +45,52 @@
 import java.util.Set;
 
 /**
- * A very simple, in-memory implementation of the [EMAIL PROTECTED] 
ProcessInstanceDAO}
- * interface.
+ * A very simple, in-memory implementation of the [EMAIL PROTECTED] 
ProcessInstanceDAO} interface.
  */
 public class ProcessInstanceDaoImpl extends DaoBaseImpl implements 
ProcessInstanceDAO {
     private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = 
Collections.emptyList();
 
     private short _previousState;
+
     private short _state;
+
     private Long _instanceId;
+
     private ProcessDaoImpl _processDao;
+
     private Object _soup;
+
     private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
+
     private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, 
List<ScopeDAO>>();
+
     private Map<String, byte[]> _messageExchanges = new HashMap<String, 
byte[]>();
+
     private ScopeDAO _rootScope;
+
     private FaultDAO _fault;
+
     private CorrelatorDAO _instantiatingCorrelator;
+
     private BpelDAOConnection _conn;
+
     private int _failureCount;
+
     private Date _failureDateTime;
+
     private Map<String, ActivityRecoveryDAO> _activityRecoveries = new 
HashMap<String, ActivityRecoveryDAO>();
 
     // TODO: Remove this, we should be using the main event store...
     private List<ProcessInstanceEvent> _events = new 
ArrayList<ProcessInstanceEvent>();
+
     private Date _lastActive;
+
     private int _seq;
 
+    private byte[] _execState;
+
+    private int _execStateCount;
+
     ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, 
CorrelatorDAO correlator) {
         _state = 0;
         _processDao = processDao;
@@ -125,11 +144,11 @@
      * @see ProcessInstanceDAO#getExecutionState()
      */
     public byte[] getExecutionState() {
-        throw new IllegalStateException("In-memory instances are never 
serialized");
+        return _execState;
     }
 
     public void setExecutionState(byte[] bytes) {
-        throw new IllegalStateException("In-memory instances are never 
serialized");
+        _execState = bytes;
     }
 
     public Object getSoup() {
@@ -314,7 +333,7 @@
     }
 
     public void createActivityRecovery(String channel, long activityId, String 
reason, Date dateTime, Element data,
-                                       String[] actions, int retries) {
+            String[] actions, int retries) {
         _activityRecoveries
                 .put(channel, new ActivityRecoveryDAOImpl(channel, activityId, 
reason, dateTime, data, actions, retries));
         _failureCount = _activityRecoveries.size();
@@ -347,7 +366,7 @@
         private int _retries;
 
         ActivityRecoveryDAOImpl(String channel, long activityId, String 
reason, Date dateTime, Element details, String[] actions,
-                                int retries) {
+                int retries) {
             _activityId = activityId;
             _channel = channel;
             _reason = reason;
@@ -404,5 +423,13 @@
 
     public String toString() {
         return "mem.instance(type=" + _processDao.getType() + " iid=" + 
_instanceId + ")";
+    }
+
+    public int getExecutionStateCounter() {
+        return _execStateCount;
+    }
+
+    public void setExecutionStateCounter(int stateCounter) {
+        _execStateCount = stateCounter;
     }
 }


Reply via email to