Author: mriou
Date: Wed Dec 3 11:51:01 2008
New Revision: 723038
URL: http://svn.apache.org/viewvc?rev=723038&view=rev
Log:
More resource-oriented logic. Routing seems to work. Basic messaging activities
(receive and onMessage) are resource aware.
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
(original)
+++
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
Wed Dec 3 11:51:01 2008
@@ -271,6 +271,8 @@
public void setExecutionStateCounter(int stateCounter);
+ Set<String> getAllResourceRoutes();
+
/**
* Transport object holding the date of the first and last instance event
along with the number events.
*/
Modified:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
(original)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
Wed Dec 3 11:51:01 2008
@@ -446,4 +446,13 @@
resRoute.setInstance(_instance);
getSession().save(resRoute);
}
+
+ public Set<String> getAllResourceRoutes() {
+ Set<HResourceRoute> rr = _instance.getResourceRoutes();
+ HashSet<String> rs = new HashSet<String>();
+ for (HResourceRoute hResourceRoute : rr) {
+ rs.add(hResourceRoute.getUrl() + "~" + hResourceRoute.getMethod());
+ }
+ return rs;
+ }
}
Modified:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
(original)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
Wed Dec 3 11:51:01 2008
@@ -50,6 +50,8 @@
private Set<HMessageExchange> _messageExchanges = new
HashSet<HMessageExchange>();
+ private Set<HResourceRoute> _resourceRoutes = new
HashSet<HResourceRoute>();
+
private HFaultData _fault;
private HLargeData _jacobState;
@@ -154,6 +156,19 @@
}
/**
+ * @hibernate.set lazy="true" inverse="true" cascade="delete"
+ * @hibernate.collection-key column="PIID"
+ * @hibernate.collection-one-to-many
class="org.apache.ode.daohib.bpel.hobj.HResourceRoute"
+ */
+ public Set<HResourceRoute> getResourceRoutes() {
+ return _resourceRoutes;
+ }
+
+ public void setResourceRoutes(Set<HResourceRoute> rroutes) {
+ _resourceRoutes = rroutes;
+ }
+
+ /**
* @hibernate.property column="PREVIOUS_STATE"
*/
public short getPreviousState() {
Modified:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
(original)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
Wed Dec 3 11:51:01 2008
@@ -60,7 +60,7 @@
}
/**
- * @hibernate.many-to-one column="INSTANCE"
+ * @hibernate.many-to-one column="PIID"
*/
public HProcessInstance getInstance() {
return _instance;
Modified:
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
(original)
+++
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
Wed Dec 3 11:51:01 2008
@@ -20,16 +20,7 @@
package org.apache.ode.dao.jpa;
import org.apache.ode.bpel.common.ProcessState;
-import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.CorrelationSetDAO;
-import org.apache.ode.bpel.dao.CorrelatorDAO;
-import org.apache.ode.bpel.dao.FaultDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
-import org.apache.ode.bpel.dao.ScopeStateEnum;
-import org.apache.ode.bpel.dao.XmlDataDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.w3c.dom.Element;
@@ -94,6 +85,8 @@
private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
@OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
private Collection<ActivityRecoveryDAO> _recoveries = new
ArrayList<ActivityRecoveryDAO>();
+
@OneToMany(targetEntity=ResourceRouteDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+ private Collection<ResourceRouteDAO> _resourceRoutes = new
ArrayList<ResourceRouteDAO>();
@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL})
@Column(name="FAULT_ID")
private FaultDAOImpl _fault;
@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST})
@Column(name="PROCESS_ID")
@@ -158,7 +151,17 @@
}
public void createResourceRoute(String url, String method, String
pickResponseChannel, int selectorIdx) {
- new ResourceRouteDAOImpl(url, method, pickResponseChannel,
selectorIdx, this);
+ ResourceRouteDAOImpl rr = new ResourceRouteDAOImpl(url, method,
pickResponseChannel, selectorIdx, this);
+ rr.setInstance(this);
+ _resourceRoutes.add(rr);
+ }
+
+ public Set<String> getAllResourceRoutes() {
+ HashSet<String> rs = new HashSet<String>();
+ for (ResourceRouteDAO resourceRoute : _resourceRoutes) {
+ rs.add(resourceRoute.getUrl() + "~" + resourceRoute.getMethod());
+ }
+ return rs;
}
public void finishCompletion() {
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Wed Dec 3 11:51:01 2008
@@ -21,10 +21,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
import javax.wsdl.Operation;
import javax.xml.namespace.QName;
@@ -148,6 +145,8 @@
_dao.setFault(faultData.getFaultName(), faultData.getExplanation(),
faultData.getFaultLineNo(),
faultData.getActivityId(), faultData.getFaultMessage());
+ cleanupResourceRoutes();
+
// send event
ProcessInstanceStateChangeEvent evt = new
ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
@@ -164,6 +163,8 @@
ODEProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + "
completed OK.");
}
+ cleanupResourceRoutes();
+
// send event
ProcessInstanceStateChangeEvent evt = new
ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
@@ -319,6 +320,15 @@
// TODO schedule a matcher to see if the message arrived already
}
+ private void cleanupResourceRoutes() {
+ Set<String> routes = _dao.getAllResourceRoutes();
+ for (String route : routes) {
+ String[] resArr = route.split("~");
+ org.apache.ode.bpel.iapi.Resource res = new
org.apache.ode.bpel.iapi.Resource(resArr[0], "application/xml", resArr[1]);
+
_bpelProcess._contexts.bindingContext.deactivateProvidedResource(res);
+ }
+ }
+
public CorrelationKey readCorrelation(CorrelationSet cset) {
ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId());
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName());
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
Wed Dec 3 11:51:01 2008
@@ -73,56 +73,62 @@
}
mexdao.setProcess(getProcessDAO());
- Resource instantiatingResource = getResource(mexdao.getResource());
- InvocationStyle istyle = mexdao.getInvocationStyle();
+ try {
+ Resource instantiatingResource = getResource(mexdao.getResource());
+ InvocationStyle istyle = mexdao.getInvocationStyle();
+
+ if (instantiatingResource != null) {
+ ProcessInstanceDAO newInstance =
getProcessDAO().createInstance(null);
+ newInstance.setInstantiatingUrl(mexdao.getResource());
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new
NewProcessInstanceEvent(getProcessModel().getQName(),
+ getProcessDAO().getProcessId(),
newInstance.getInstanceId());
+ evt.setMexId(mexdao.getMessageExchangeId());
+ saveEvent(evt, newInstance);
- if (instantiatingResource != null) {
- ProcessInstanceDAO newInstance =
getProcessDAO().createInstance(null);
- newInstance.setInstantiatingUrl(mexdao.getResource());
-
- // send process instance event
- NewProcessInstanceEvent evt = new
NewProcessInstanceEvent(getProcessModel().getQName(),
- getProcessDAO().getProcessId(),
newInstance.getInstanceId());
- evt.setMexId(mexdao.getMessageExchangeId());
- saveEvent(evt, newInstance);
-
-
mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
- mexdao.setInstance(newInstance);
-
- doInstanceWork(mexdao.getInstance().getInstanceId(), new
Callable<Void>() {
- public Void call() {
- executeCreateInstance(mexdao);
- return null;
- }
- });
- } else {
- // TODO avoid reloading the resource routing, it's just been
loaded by the server on mex creation
- String[] urlMeth = mexdao.getResource().split("~");
- ResourceRouteDAO rr =
_contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
- // This really should have been caught by the server
- if (rr == null) throw new BpelEngineException("NoSuchResource: " +
mexdao.getResource());
- mexdao.setInstance(rr.getInstance());
- mexdao.setChannel(rr.getPickResponseChannel() + "&" +
rr.getSelectorIdx());
+
mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+ mexdao.setInstance(newInstance);
- if (istyle == InvocationStyle.TRANSACTED) {
doInstanceWork(mexdao.getInstance().getInstanceId(), new
Callable<Void>() {
public Void call() {
- executeContinueInstanceMyRoleRequestReceived(mexdao);
+ executeCreateInstance(mexdao);
return null;
}
});
- } else /* non-transacted style */ {
- WorkEvent we = new WorkEvent();
- we.setType(WorkEvent.Type.MYROLE_INVOKE);
- we.setIID(mexdao.getInstance().getInstanceId());
- we.setMexId(mexdao.getMessageExchangeId());
- // Could be different to this pid when routing to an older
version
-
we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+ } else {
+ // TODO avoid reloading the resource routing, it's just been
loaded by the server on mex creation
+ String[] urlMeth = mexdao.getResource().split("~");
+ ResourceRouteDAO rr =
_contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
+ // This really should have been caught by the server
+ if (rr == null) throw new BpelEngineException("NoSuchResource:
" + mexdao.getResource());
+ mexdao.setInstance(rr.getInstance());
+ mexdao.setChannel(rr.getPickResponseChannel() + "&" +
rr.getSelectorIdx());
+
+ if (istyle == InvocationStyle.TRANSACTED) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new
Callable<Void>() {
+ public Void call() {
+
executeContinueInstanceMyRoleRequestReceived(mexdao);
+ return null;
+ }
+ });
+ } else /* non-transacted style */ {
+ WorkEvent we = new WorkEvent();
+ we.setType(WorkEvent.Type.MYROLE_INVOKE);
+ we.setIID(mexdao.getInstance().getInstanceId());
+ we.setMexId(mexdao.getMessageExchangeId());
+ // Could be different to this pid when routing to an older
version
+
we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
- scheduleWorkEvent(we, null);
- }
+ scheduleWorkEvent(we, null);
+ }
+ }
+ } finally {
+ // If we did not get an ACK during this method, then mark this MEX
as needing an ASYNC wake-up
+ if (mexdao.getStatus() != MessageExchange.Status.ACK)
mexdao.setStatus(MessageExchange.Status.ASYNC);
}
+
}
void onRestMexAck(MessageExchangeDAO mexdao, MessageExchange.Status old,
String url) {
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Wed Dec 3 11:51:01 2008
@@ -246,6 +246,14 @@
_resourceRoutes.put(url, rroute);
}
+ public Set<String> getAllResourceRoutes() {
+ HashSet<String> rs = new HashSet<String>();
+ for (ResourceRouteDAO routeDAO : _resourceRoutes.values()) {
+ rs.add(routeDAO.getUrl() + "~" + routeDAO.getMethod());
+ }
+ return rs;
+ }
+
/**
* @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
*/
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
Wed Dec 3 11:51:01 2008
@@ -71,6 +71,8 @@
void unregisterActivityForRecovery(ActivityRecoveryChannel
recoveryChannel);
+ void cancelOutstandingRequests(String channelId);
+
void select(PickResponseChannel pickResponseChannel, Date timeout, boolean
createInstance, Selector[] selectors)
throws FaultException;
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
Wed Dec 3 11:51:01 2008
@@ -119,7 +119,7 @@
Entry entry = _byChannel.remove(pickResponseChannel);
if (entry != null) {
- _byRid.values().remove(entry);
+ while(_byRid.values().remove(entry));
}
}
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
Wed Dec 3 11:51:01 2008
@@ -108,6 +108,10 @@
}
+ public void cancelOutstandingRequests(String channelId) {
+ getORM().cancel(channelId);
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout,
boolean createInstance, Selector[] selectors)
throws FaultException {
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
Wed Dec 3 11:51:01 2008
@@ -68,10 +68,8 @@
/** Has a termination of this handler been requested. */
private boolean _terminated;
-
private boolean _childrenTerminated;
-
EH_EVENT(ParentScopeChannel psc,TerminationChannel tc,
EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
_scopeFrame = scopeFrame;
_oevent = o;
@@ -80,9 +78,8 @@
_ehc = ehc;
}
-
public void run() {
- instance(new SELECT(_scopeFrame));
+ instance(new SELECT(_scopeFrame, 0));
}
/**
@@ -96,6 +93,7 @@
_childrenTerminated = true;
}
}
+
/**
* Template that does the actual selection interaction with the runtime
system, and
* then waits on the pick response channel.
@@ -104,8 +102,11 @@
private static final long serialVersionUID = 1L;
- public SELECT(ScopeFrame scopeFrame) {
+ private int _counter;
+
+ public SELECT(ScopeFrame scopeFrame, int counter) {
_scopeFrame = scopeFrame;
+ _counter = counter;
}
/**
@@ -117,7 +118,7 @@
PickResponseChannel pickResponseChannel =
newChannel(PickResponseChannel.class);
if (_oevent.isRestful()) {
getBpelRuntime().checkResourceRoute(_scopeFrame.resolve(_oevent.resource),
- _oevent.messageExchangeId, pickResponseChannel, 0);
+ _oevent.messageExchangeId+_counter,
pickResponseChannel, 0);
} else {
CorrelationKey key;
PartnerLinkInstance pLinkInstance =
_scopeFrame.resolve(_oevent.partnerLink);
@@ -133,17 +134,18 @@
assert key != null;
}
- selector = new
Selector(0,pLinkInstance,_oevent.operation.getName(),
_oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+ selector = new
Selector(0,pLinkInstance,_oevent.operation.getName(),
+ _oevent.operation.getOutput() == null,
_oevent.messageExchangeId+_counter, key);
getBpelRuntime().select(pickResponseChannel, null, false,
new Selector[] { selector} );
}
- instance(new WAITING(pickResponseChannel, _scopeFrame));
+ instance(new WAITING(pickResponseChannel, _scopeFrame,
_counter));
} catch(FaultException e){
__log.error(e);
if (_fault == null) {
_fault = createFault(e.getQName(), _oevent);
}
terminateActive();
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
}
}
@@ -154,10 +156,12 @@
private class WAITING extends BpelJacobRunnable {
private static final long serialVersionUID = 1L;
private PickResponseChannel _pickResponseChannel;
+ private int _counter;
- private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame
scopeFrame) {
+ private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame
scopeFrame, int counter) {
_pickResponseChannel = pickResponseChannel;
_scopeFrame = scopeFrame;
+ _counter = counter;
}
public void run() {
@@ -168,7 +172,6 @@
if (!_terminated) {
mlset.add(new TerminationChannelListener(_tc) {
private static final long serialVersionUID =
7666910462948788042L;
-
public void terminate() {
terminateActive();
_terminated = true;
@@ -177,13 +180,11 @@
instance(WAITING.this);
}
});
-
}
if (!_stopped) {
mlset.add(new EventHandlerControlChannelListener(_ehc) {
private static final long serialVersionUID =
-1050788954724647970L;
-
public void stop() {
_stopped = true;
if (_pickResponseChannel != null)
@@ -191,18 +192,15 @@
instance(WAITING.this);
}
});
-
}
for (final ActivityInfo ai : _active) {
mlset.add(new ParentScopeChannelListener(ai.parent) {
private static final long serialVersionUID =
5341207762415360982L;
-
public void compensate(OScope scope, SynchChannel ret)
{
_psc.compensate(scope, ret);
instance(WAITING.this);
}
-
public void completed(FaultData faultData,
Set<CompensationHandler> compensations) {
_active.remove(ai);
_comps.addAll(compensations);
@@ -213,7 +211,6 @@
} else
instance(WAITING.this);
}
-
public void cancelled() { completed(null,
CompensationHandler.emptySet()); }
public void failure(String reason, Element data) {
completed(null, CompensationHandler.emptySet()); }
});
@@ -228,6 +225,14 @@
ScopeFrame ehScopeFrame = new ScopeFrame(_oevent,
getBpelRuntime().createScopeInstance(_scopeFrame.scopeInstanceId, _oevent),
_scopeFrame, _comps, _fault);
+ ehScopeFrame.eventScope = true;
+ if (_oevent.isRestful()) {
+
getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.resource),
+ _oevent.messageExchangeId+_counter,
_oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+ } else {
+
getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.partnerLink),
_oevent.operation.getName(),
+ _oevent.messageExchangeId+_counter,
_oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+ }
if (_oevent.variable != null) {
Element msgEl =
getBpelRuntime().getMyRequest(mexId);
@@ -250,7 +255,6 @@
}
}
-
try {
for (OScope.CorrelationSet cset :
_oevent.initCorrelations) {
initializeCorrelation(ehScopeFrame.resolve(cset),
ehScopeFrame.resolve(_oevent.variable));
@@ -279,7 +283,7 @@
_fault = createFault(e.getQName(),
_oevent);
terminateActive();
}
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame,
_counter));
return;
}
@@ -299,18 +303,17 @@
if (_childrenTerminated)
replication(child.self).terminate();
if (_terminated || _stopped || _fault != null)
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame,
_counter));
else
- instance(new SELECT(_scopeFrame));
+ instance(new SELECT(_scopeFrame, _counter+1));
}
-
public void onTimeout() {
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
public void onCancel() {
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
});
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
Wed Dec 3 11:51:01 2008
@@ -214,7 +214,7 @@
}
public String getDescription() {
- StringBuffer buf = new StringBuffer(declaringScope.name);
+ StringBuffer buf = new StringBuffer(declaringScope.name != null ?
declaringScope.name : "");
buf.append('.');
buf.append(name);
return buf.toString();
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
Wed Dec 3 11:51:01 2008
@@ -77,6 +77,8 @@
void unregisterActivityForRecovery(ActivityRecoveryChannel
recoveryChannel);
+ void cancelOutstandingRequests(String channelId);
+
void select(PickResponseChannel pickResponseChannel, Date timeout, boolean
createInstance, Selector[] selectors)
throws FaultException;
@@ -124,6 +126,10 @@
void forceFlush();
+ void associateEvent(PartnerLinkInstance plinkInstance, String opName,
String mexRef, String scopeIid);
+
+ void associateEvent(ResourceInstance resourceInstance, String mexRef,
String scopeIid);
+
void reply(PartnerLinkInstance plink, String opName, String bpelmex,
Element element, QName fault)
throws FaultException;
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
Wed Dec 3 11:51:01 2008
@@ -140,10 +140,10 @@
Entry entry = _byChannel.remove(pickResponseChannel);
if (entry != null)
- _byRid.values().remove(entry);
+ while(_byRid.values().remove(entry));
RestEntry restEntry = _byRestChannel.remove(pickResponseChannel);
if (restEntry != null)
- _byRestRid.values().remove(restEntry);
+ while(_byRestRid.values().remove(restEntry));
}
/**
@@ -184,6 +184,20 @@
}
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String
opName, String mexRef, String scopeIid) {
+ RequestIdTuple rid = new RequestIdTuple(plinkInstance, opName, mexRef);
+ Entry entry = _byRid.remove(rid);
+ rid.mexId = scopeIid;
+ _byRid.put(rid, entry);
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String
method, String mexRef, String scopeIid) {
+ RequestResTuple rid = new RequestResTuple(resourceInstance, method,
mexRef);
+ RestEntry entry = _byRestRid.remove(rid);
+ rid.mexId = scopeIid;
+ _byRestRid.put(rid, entry);
+ }
+
/**
* Release the registration. This method is called when the reply activity
sends a reply corresponding to the
* registration.
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
Wed Dec 3 11:51:01 2008
@@ -55,13 +55,17 @@
for (OScope.CorrelationSet cset : oreply.initCorrelations)
initializeCorrelation(_scopeFrame.resolve(cset),
_scopeFrame.resolve(oreply.variable));
+ // If this reply matches an event, we have to know which scope it
fits in
+ ScopeFrame eventFrame = _scopeFrame.findEventScope();
+ String eventFrameId = eventFrame == null ? "" :
eventFrame.scopeInstanceId.toString();
+
// send reply
if (oreply.resource != null)
getBpelRuntime().reply(_scopeFrame.resolve(oreply.resource),
- oreply.messageExchangeId, (Element)msg, oreply.fault);
+ oreply.messageExchangeId+eventFrameId, (Element)msg,
oreply.fault);
else
getBpelRuntime().reply(_scopeFrame.resolve(oreply.partnerLink),
oreply.operation.getName(),
- oreply.messageExchangeId, (Element)msg, oreply.fault);
+ oreply.messageExchangeId+eventFrameId, (Element)msg,
oreply.fault);
} catch (FaultException e) {
__log.error(e);
fault = createFault(e.getQName(), oreply);
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
Wed Dec 3 11:51:01 2008
@@ -119,6 +119,10 @@
return _brc.getInstantiatingUrl();
}
+ public void cancelOutstandingRequests(String channelId) {
+ getORM().cancel(channelId);
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout,
boolean createInstance, Selector[] selectors)
throws FaultException {
@@ -449,6 +453,14 @@
_brc.sendEvent(evt);
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String
opName, String mexRef, String scopeIid) {
+ getORM().associateEvent(plinkInstance, opName, mexRef, scopeIid);
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String
mexRef, String scopeIid) {
+ getORM().associateEvent(resourceInstance,
resourceInstance.getModel().getMethod(), mexRef, scopeIid);
+ }
+
public void reply(PartnerLinkInstance plink, String opName, String
bpelmex, Element element, QName fault) throws FaultException {
String mexid = getORM().release(plink, opName, bpelmex);
if (mexid == null)
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
Wed Dec 3 11:51:01 2008
@@ -50,6 +50,8 @@
final InstanceGlobals globals;
+ boolean eventScope = false;
+
/** Constructor used to create "fault" scopes. */
ScopeFrame( OScope scopeDef,
Long scopeInstanceId,
@@ -85,6 +87,12 @@
return (parent != null) ? parent.find(scope) : null;
}
+ public ScopeFrame findEventScope() {
+ if (eventScope) return this;
+ else if (parent == null) return null;
+ else return parent.findEventScope();
+ }
+
public VariableInstance resolve(OScope.Variable variable) {
ScopeFrame scopeFrame = find(variable.declaringScope);
if (scopeFrame == null) return null;
Modified:
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
(original)
+++
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
Wed Dec 3 11:51:01 2008
@@ -126,6 +126,10 @@
//To change body of implemented methods use File | Settings | File
Templates.
}
+ public void cancelOutstandingRequests(String channelId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout,
boolean createInstance, Selector[] selectors) throws FaultException {
//To change body of implemented methods use File | Settings | File
Templates.
}
Modified:
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
(original)
+++
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
Wed Dec 3 11:51:01 2008
@@ -128,6 +128,14 @@
return null; //To change body of implemented methods use File |
Settings | File Templates.
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String
opName, String mexRef, String scopeIid) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String
mexRef, String scopeIid) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
public void reply(ResourceInstance resource, String bpelmex, Element
element, QName fault) throws FaultException {
//To change body of implemented methods use File | Settings | File
Templates.
}
@@ -144,6 +152,10 @@
//To change body of implemented methods use File | Settings | File
Templates.
}
+ public void cancelOutstandingRequests(String channelId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout,
boolean createInstance, Selector[] selectors) throws FaultException {
//To change body of implemented methods use File | Settings | File
Templates.
}