Author: mszefler
Date: Tue Aug 7 11:06:05 2007
New Revision: 563599
URL: http://svn.apache.org/viewvc?view=rev&rev=563599
Log:
BART tweaks: open up scheduling for in-memory processes to assist testing.
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
Tue Aug 7 11:06:05 2007
@@ -34,6 +34,8 @@
private Thread _workerThread;
+ private CachedState _cachedState;
+
BpelInstanceWorker(BpelProcess process, Long iid) {
_process = process;
_iid = iid;
@@ -154,8 +156,29 @@
return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" +
_iid + " workerThread="+ _workerThread + "}";
}
- public boolean isWorkerThread() {
+ boolean isWorkerThread() {
return _activeInstance.get() != null;
+ }
+
+ Object getCachedState(Object uuid) {
+ CachedState cs = _cachedState;
+ if (cs != null && cs.uuid.equals(uuid))
+ return cs.state;
+ return null;
+ }
+
+ void setCachedState(Object uuid, Object state) {
+ _cachedState = new CachedState(uuid, state);
+ }
+
+ private class CachedState {
+ final Object uuid;
+ final Object state;
+
+ CachedState(Object uuid, Object state) {
+ this.uuid = uuid;
+ this.state = state;
+ }
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Tue Aug 7 11:06:05 2007
@@ -22,6 +22,7 @@
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -67,6 +68,7 @@
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
import org.apache.ode.bpel.runtime.channels.FaultData;
@@ -274,7 +276,7 @@
BpelInstanceWorker worker =
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
- BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker,
mexdao.getInstance(), null, null);
+ BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker,
mexdao.getInstance());
int amp = mexdao.getChannel().indexOf('&');
String groupId = mexdao.getChannel().substring(0, amp);
int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
@@ -287,10 +289,7 @@
BpelInstanceWorker worker =
_instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
-// TODO: we need a way to check if the lastBRC is indeed the lastBRC
(serial number on the instanceDAO)
-// BpelRuntimeContextImpl brc = lastBRC == null ? new
BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null)
-// : new BpelRuntimeContextImpl(worker, mexdao.getInstance(), lastBRC);
- BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker,
mexdao.getInstance(), null, null);
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker,
mexdao.getInstance());
brc.injectPartnerResponse(mexdao.getMessageExchangeId(),
mexdao.getChannel());
brc.execute();
@@ -480,7 +479,7 @@
return;
}
- BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker,
instanceDAO, null, null);
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker,
instanceDAO);
switch (we.getType()) {
case TIMER:
if (__log.isDebugEnabled()) {
@@ -1137,6 +1136,13 @@
} finally {
_hydrationLatch.release(1);
}
+ }
+
+ public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+// if (isInMemory())
+// throw new InvalidProcessException("In-mem process execution
resulted in event scheduling.");
+
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
}
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Tue Aug 7 11:06:05 2007
@@ -131,29 +131,22 @@
private boolean _executed;
- /**
- * Construct a BRC using the soup from the previous BRC. This is handy as
it allows us to eliminate the DB read of the soup,
- * when we know the soup has not changed since the last TX.
- *
- * @param instanceWorker
- * @param instanceDao
- * @param lastBRC
- */
- BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
ProcessInstanceDAO instanceDao, BpelRuntimeContextImpl lastBRC) {
- this(instanceWorker, instanceDao, lastBRC._soup);
- }
-
- BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
ProcessInstanceDAO dao, PROCESS PROCESS,
- MessageExchangeDAO instantiatingMessageExchange) {
-
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
ProcessInstanceDAO dao) {
this(instanceWorker, dao, new ExecutionQueueImpl(null));
- _instantiatingMessageExchange = instantiatingMessageExchange;
- _soup.setReplacementMap(_bpelProcess.getReplacementMap());
- _soup.setGlobalData(new OutstandingRequestManager());
- byte[] daoState = _bpelProcess.isInMemory() ? null :
dao.getExecutionState();
- if (daoState != null) {
- assert !_bpelProcess.isInMemory() : "did not expect to rehydrate
in-mem process!";
+ // The following allows us to skip deserialization of the soup if our
execution state in memory is the same
+ // as that in the database.
+ Object cachedState =
instanceWorker.getCachedState(dao.getExecutionStateCounter());
+ if (cachedState != null) {
+ if (__log.isDebugEnabled())
+ __log.debug("CACHE HIT: Using cached state #" +
dao.getExecutionStateCounter() + " to resume instance " + dao.getInstanceId());
+ _soup = (ExecutionQueueImpl) cachedState;
+ _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+ _vpu.setContext(_soup);
+ } else {
+ if (__log.isDebugEnabled())
+ __log.debug("CACHE MISS: Loading state to resume instance " +
dao.getInstanceId() + " from database ");
+ byte[] daoState = dao.getExecutionState();
ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
try {
_soup.read(iis);
@@ -161,9 +154,20 @@
throw new RuntimeException(ex);
}
}
- if (PROCESS != null) {
- _vpu.inject(PROCESS);
- }
+ }
+
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
ProcessInstanceDAO dao, PROCESS PROCESS,
+ MessageExchangeDAO instantiatingMessageExchange) {
+
+ this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+ if (PROCESS == null)
+ throw new NullPointerException();
+ if (instantiatingMessageExchange == null)
+ throw new NullPointerException();
+ _soup.setGlobalData(new OutstandingRequestManager());
+ _instantiatingMessageExchange = instantiatingMessageExchange;
+ _vpu.inject(PROCESS);
}
@@ -176,6 +180,7 @@
_vpu = new JacobVPU();
_vpu.registerExtension(BpelRuntimeContext.class, this);
_soup = soup;
+ _soup.setReplacementMap(_bpelProcess.getReplacementMap());
_vpu.setContext(_soup);
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("BpelRuntimeContextImpl created for instance " + _iid
+ ". INDEXED STATE=" + _soup.getIndex());
@@ -320,9 +325,6 @@
_dao.setState(ProcessState.STATE_READY);
evt.setNewState(ProcessState.STATE_READY);
sendEvent(evt);
- } else if (_bpelProcess.isInMemory()) {
- // This condition should be detected with static analysis, but
just in case.
- throw new InvalidProcessException("In-memory process must not
receive additional messages.");
}
final String pickResponseChannelStr = pickResponseChannel.export();
@@ -660,30 +662,23 @@
}
public void registerTimer(TimerResponseChannel timerChannel, Date
timeToFire) {
-
- if (_bpelProcess.isInMemory())
- throw new InvalidProcessException("Process not compatible with
in-memory execution.");
-
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
we.setProcessId(_bpelProcess.getPID());
we.setChannel(timerChannel.export());
we.setType(WorkEvent.Type.TIMER);
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+ _bpelProcess.scheduleWorkEvent(we, timeToFire);
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey
key) {
- if (_bpelProcess.isInMemory())
- throw new InvalidProcessException("Process not compatible with
in-memory execution.");
-
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ _bpelProcess.scheduleWorkEvent(we, null);
}
public String invoke(PartnerLinkInstance partnerLink, Operation operation,
Element outgoingMessage,
@@ -893,34 +888,13 @@
_dao.setLastActiveTime(new Date());
if (!ProcessState.isFinished(_dao.getState())) {
- if (__log.isDebugEnabled())
- __log.debug("Setting execution state on instance " + _iid);
- _soup.setGlobalData(getORM());
-
- if (_bpelProcess.isInMemory()) {
- // don't serialize in-memory processes
- ((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
- } else {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
- try {
- _soup.write(bos);
- bos.close();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- _dao.setExecutionState(bos.toByteArray());
- }
+ saveState();
if (ProcessState.canExecute(_dao.getState()) && canReduce) {
// Max time exceeded (possibly an infinite loop).
if (__log.isDebugEnabled())
__log.debug("MaxTime exceeded for instance # " + _iid);
- // NOTE: we never ever schedule anything for in-mem processes,
they have to finish in a single
- // go.
- if (_bpelProcess.isInMemory())
- throw new BpelEngineException("In-memory process
timeout.");
-
try {
WorkEvent we = new WorkEvent();
we.setIID(_iid);
@@ -935,6 +909,21 @@
}
}
+ private void saveState() {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
+ try {
+ _soup.write(bos);
+ bos.close();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ int newcount = _dao.getExecutionStateCounter() + 1;
+ _dao.setExecutionStateCounter(newcount);
+ _dao.setExecutionState(bos.toByteArray());
+ _instanceWorker.setCachedState(newcount, _soup);
+ }
+
void inputMsgMatch(final String responsechannel, final int idx,
MessageExchangeDAO mexdao) {
// if we have a message match, this instance should be marked
// active if it isn't already
@@ -951,7 +940,6 @@
evt.setNewState(ProcessState.STATE_ACTIVE);
sendEvent(evt);
}
-
getORM().associate(responsechannel, mexdao.getMessageExchangeId());
@@ -1357,7 +1345,6 @@
}
private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
- assert !_bpelProcess.isInMemory() : "Internal error; attempt to
schedule in-memory process";
assert _contexts.isTransacted();
WorkEvent we = new WorkEvent();
@@ -1365,7 +1352,7 @@
we.setMexId(messageExchange.getMessageExchangeId());
we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ _bpelProcess.scheduleWorkEvent(we, null);
}
/**
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Tue Aug 7 11:06:05 2007
@@ -37,7 +37,6 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
@@ -112,7 +111,7 @@
private volatile int _syncdummy;
enum Change {
- EPR, RESPONSE, RELEASE, REQUEST
+ EPR, ACK, RELEASE, REQUEST
}
final HashSet<Change> _changes = new HashSet<Change>();
@@ -180,19 +179,9 @@
dao.setFailureType(_failureType == null ? null :
_failureType.toString());
dao.setAckType(_ackType);
- if (_changes.contains(Change.REQUEST)) {
- MessageDAO requestDao = dao.createMessage(_request.getType());
- requestDao.setData(_request.getMessage());
- dao.setRequest(requestDao);
- }
-
- if (_changes.contains(Change.RESPONSE)) {
- MessageDAO responseDao = dao.createMessage(_response.getType());
- responseDao.setData(_response.getMessage());
- dao.setResponse(responseDao);
- }
-
+
if (_changes.contains(Change.EPR)) {
+ _changes.remove(_epr);
if (_epr != null)
dao.setEPR(_epr.toXML().getDocumentElement());
else
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Tue Aug 7 11:06:05 2007
@@ -10,6 +10,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MessageExchangeImpl.Change;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
@@ -54,6 +55,14 @@
dao.setCorrelationStatus(_cstatus == null ? null :
_cstatus.toString());
dao.setCorrelationId(_clientId);
dao.setCallee(_callee);
+
+ if (_changes.contains(Change.REQUEST)) {
+ _changes.remove(Change.REQUEST);
+ MessageDAO requestDao = dao.createMessage(_request.getType());
+ requestDao.setData(_request.getMessage());
+ dao.setRequest(requestDao);
+ }
+
}
public String getClientId() {
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
Tue Aug 7 11:06:05 2007
@@ -180,6 +180,7 @@
.getInstance().getInstanceId(),
mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
_channel);
+
// Need to cheat a little bit for in-memory processes; do the
invoke in-line, but first suspend
// the transaction so that the IL does not get confused.
Transaction tx;
@@ -189,15 +190,22 @@
} catch (Exception ex) {
throw new BpelEngineException("TxManager Error: cannot
suspend!", ex);
}
-
+
+ unreliableMex.request();
+ unreliableMex.setState(State.INVOKE_XXX);
try {
- unreliableMex.setState(State.INVOKE_XXX);
- _contexts.mexContext.invokePartnerUnreliable(unreliableMex);
+ try {
+
_contexts.mexContext.invokePartnerUnreliable(unreliableMex);
+ } catch (Throwable t) {
+ __log.error("Unexpected error invoking partner." ,t);
+ MexDaoUtil.setFailed(mexDao, FailureType.OTHER,
t.toString());
+ return;
+ }
+
try {
unreliableMex.waitForAck(mexDao.getTimeout());
} catch (InterruptedException ie) {
__log.warn("Interrupted waiting for MEX response.");
-
}
} finally {
@@ -226,6 +234,8 @@
ReliablePartnerRoleMessageExchangeImpl reliableMex = new
ReliablePartnerRoleMessageExchangeImpl(_process, mexDao
.getInstance().getInstanceId(), mexDao.getMessageExchangeId(),
_plinkDef, operation, partnerEpr, myRoleEpr,
_channel);
+
+ reliableMex.request();
reliableMex.setState(State.INVOKE_XXX);
Throwable err = null;
try {
@@ -255,6 +265,8 @@
TransactedPartnerRoleMessageExchangeImpl transactedMex = new
TransactedPartnerRoleMessageExchangeImpl(_process, mexDao
.getInstance().getInstanceId(), mexDao.getMessageExchangeId(),
_plinkDef, operation, partnerEpr, myRoleEpr,
_channel);
+
+ transactedMex.request();
transactedMex.setState(State.INVOKE_XXX);
try {
_contexts.mexContext.invokePartnerTransacted(transactedMex);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Tue Aug 7 11:06:05 2007
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
@@ -38,6 +39,7 @@
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.xmlbeans.XmlCursor.ChangeStamp;
import org.w3c.dom.Element;
import com.sun.corba.se.spi.activation._ActivatorImplBase;
@@ -98,6 +100,15 @@
@Override
void save(MessageExchangeDAO dao) {
super.save(dao);
+
+ if (_changes.contains(Change.ACK)) {
+ _changes.remove(Change.ACK);
+ MessageDAO responseDao = dao.createMessage(_response.getType());
+ responseDao.setData(_response.getMessage());
+ dao.setResponse(responseDao);
+ }
+
+
}
@Override
@@ -105,6 +116,7 @@
_accessLock.lock();
try {
super.ack(acktype);
+ _changes.add(Change.ACK);
_acked.signalAll();
} finally {
_accessLock.unlock();
@@ -234,7 +246,7 @@
throw new IllegalStateException("Object used in inappropriate
context. ");
if (getStatus() != MessageExchange.Status.REQ)
- throw new IllegalStateException("Invalid message exchange state,
expect REQUEST or ASYNC, but got " + getStatus());
+ throw new IllegalStateException("Invalid message exchange state,
expect REQ but got " + getStatus());
}
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
Tue Aug 7 11:06:05 2007
@@ -17,7 +17,7 @@
import org.apache.ode.bpel.o.OPartnerLink;
/**
- * For invoking the engine using ASYNC style.
+ * For invoking the engine using UNRELIABLE style.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
@@ -36,7 +36,6 @@
* Override the setStatus(...) to notify our future when there is a
response/failure.
*/
protected void ack(AckType acktype) {
- Status old = getStatus();
super.ack(acktype);
if (_future != null) {
_future.done(Status.ACK);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=563599&r1=563598&r2=563599
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Tue Aug 7 11:06:05 2007
@@ -45,33 +45,52 @@
import java.util.Set;
/**
- * A very simple, in-memory implementation of the [EMAIL PROTECTED]
ProcessInstanceDAO}
- * interface.
+ * A very simple, in-memory implementation of the [EMAIL PROTECTED]
ProcessInstanceDAO} interface.
*/
public class ProcessInstanceDaoImpl extends DaoBaseImpl implements
ProcessInstanceDAO {
private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS =
Collections.emptyList();
private short _previousState;
+
private short _state;
+
private Long _instanceId;
+
private ProcessDaoImpl _processDao;
+
private Object _soup;
+
private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
+
private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String,
List<ScopeDAO>>();
+
private Map<String, byte[]> _messageExchanges = new HashMap<String,
byte[]>();
+
private ScopeDAO _rootScope;
+
private FaultDAO _fault;
+
private CorrelatorDAO _instantiatingCorrelator;
+
private BpelDAOConnection _conn;
+
private int _failureCount;
+
private Date _failureDateTime;
+
private Map<String, ActivityRecoveryDAO> _activityRecoveries = new
HashMap<String, ActivityRecoveryDAO>();
// TODO: Remove this, we should be using the main event store...
private List<ProcessInstanceEvent> _events = new
ArrayList<ProcessInstanceEvent>();
+
private Date _lastActive;
+
private int _seq;
+ private byte[] _execState;
+
+ private int _execStateCount;
+
ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao,
CorrelatorDAO correlator) {
_state = 0;
_processDao = processDao;
@@ -125,11 +144,11 @@
* @see ProcessInstanceDAO#getExecutionState()
*/
public byte[] getExecutionState() {
- throw new IllegalStateException("In-memory instances are never
serialized");
+ return _execState;
}
public void setExecutionState(byte[] bytes) {
- throw new IllegalStateException("In-memory instances are never
serialized");
+ _execState = bytes;
}
public Object getSoup() {
@@ -314,7 +333,7 @@
}
public void createActivityRecovery(String channel, long activityId, String
reason, Date dateTime, Element data,
- String[] actions, int retries) {
+ String[] actions, int retries) {
_activityRecoveries
.put(channel, new ActivityRecoveryDAOImpl(channel, activityId,
reason, dateTime, data, actions, retries));
_failureCount = _activityRecoveries.size();
@@ -347,7 +366,7 @@
private int _retries;
ActivityRecoveryDAOImpl(String channel, long activityId, String
reason, Date dateTime, Element details, String[] actions,
- int retries) {
+ int retries) {
_activityId = activityId;
_channel = channel;
_reason = reason;
@@ -404,5 +423,13 @@
public String toString() {
return "mem.instance(type=" + _processDao.getType() + " iid=" +
_instanceId + ")";
+ }
+
+ public int getExecutionStateCounter() {
+ return _execStateCount;
+ }
+
+ public void setExecutionStateCounter(int stateCounter) {
+ _execStateCount = stateCounter;
}
}