Author: mriou
Date: Wed Nov 26 18:01:27 2008
New Revision: 721066
URL: http://svn.apache.org/viewvc?rev=721066&view=rev
Log:
Checkpoint commit. Getting to enable RESTful onEvents and have them receive
messages. Unfortunately it still goes wrong when they re-enable themselves.
Still working on it.
Added:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ResourceRouteDaoImpl.java
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/SCOPE.java
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
(original)
+++
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
Wed Nov 26 18:01:27 2008
@@ -119,4 +119,6 @@
MessageExchangeDAO getMessageExchange(String mexid);
+ ResourceRouteDAO getResourceRoute(String url, String method);
+
}
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
(original)
+++
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
Wed Nov 26 18:01:27 2008
@@ -259,6 +259,7 @@
void createActivityRecovery(String channel, long activityId, String
reason, Date dateTime,
Element data, String[] actions, int retries);
+
void createResourceRoute(String url, String method, String
pickResponseChannel, int selectorIdx);
/**
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
(original)
+++
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
Wed Nov 26 18:01:27 2008
@@ -1,4 +1,22 @@
package org.apache.ode.bpel.dao;
public interface ResourceRouteDAO {
+ public String getUrl();
+
+ public void setUrl(String url);
+
+ public String getMethod();
+
+ public void setMethod(String method);
+
+ public String getPickResponseChannel();
+
+ public void setPickResponseChannel(String pickResponseChannel);
+
+ public int getSelectorIdx();
+
+ public void setSelectorIdx(int selectorIdx);
+
+ public ProcessInstanceDAO getInstance();
+
}
Modified:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
(original)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
Wed Nov 26 18:01:27 2008
@@ -84,6 +84,19 @@
}
}
+ public ResourceRouteDAO getResourceRoute(String url, String method) {
+ try {
+ Criteria criteria = _session.createCriteria(HResourceRoute.class);
+ criteria.add(Expression.eq("url", url));
+ criteria.add(Expression.eq("method", method));
+ HResourceRoute hrr = (HResourceRoute) criteria.uniqueResult();
+ return hrr == null ? null : new ResourceRouteDaoImpl(_sm, hrr);
+ } catch (HibernateException e) {
+ __log.error("DbError", e);
+ throw e;
+ }
+ }
+
public ProcessDAO createProcess(QName pid, QName type, String guid, long
version) {
HProcess process = new HProcess();
process.setProcessId(pid.toString());
Added:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ResourceRouteDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ResourceRouteDaoImpl.java?rev=721066&view=auto
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ResourceRouteDaoImpl.java
(added)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ResourceRouteDaoImpl.java
Wed Nov 26 18:01:27 2008
@@ -0,0 +1,52 @@
+package org.apache.ode.daohib.bpel;
+
+import org.apache.ode.bpel.dao.ResourceRouteDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.daohib.bpel.hobj.HResourceRoute;
+import org.apache.ode.daohib.SessionManager;
+
+public class ResourceRouteDaoImpl extends HibernateDao implements
ResourceRouteDAO {
+
+ HResourceRoute _self;
+
+ public ResourceRouteDaoImpl(SessionManager sessionManager, HResourceRoute
hrr) {
+ super(sessionManager, hrr);
+ _self = hrr;
+ }
+
+ public String getUrl() {
+ return _self.getUrl();
+ }
+
+ public void setUrl(String url) {
+ _self.setUrl(url);
+ }
+
+ public String getMethod() {
+ return _self.getMethod();
+ }
+
+ public void setMethod(String method) {
+ _self.setMethod(method);
+ }
+
+ public String getPickResponseChannel() {
+ return _self.getChannelId();
+ }
+
+ public void setPickResponseChannel(String channelId) {
+ _self.setChannelId(channelId);
+ }
+
+ public int getSelectorIdx() {
+ return _self.getIndex();
+ }
+
+ public void setSelectorIdx(int index) {
+ _self.setIndex(index);
+ }
+
+ public ProcessInstanceDAO getInstance() {
+ return new ProcessInstanceDaoImpl(_sm, _self.getInstance());
+ }
+}
Modified:
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
(original)
+++
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
Wed Nov 26 18:01:27 2008
@@ -16,7 +16,7 @@
}
/**
- * @hibernate.property column="URL" not-null="true" size="255"
+ * @hibernate.property column="URL" not-null="true" size="255"
unique="true"
*/
public String getUrl() {
return _url;
Modified:
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
(original)
+++
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
Wed Nov 26 18:01:27 2008
@@ -37,11 +37,7 @@
import org.apache.ode.bpel.common.BpelEventFilter;
import org.apache.ode.bpel.common.Filter;
import org.apache.ode.bpel.common.InstanceFilter;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.evt.ScopeEvent;
import org.apache.ode.utils.ISO8601DateParser;
@@ -303,6 +299,14 @@
return m;
}
+ public ResourceRouteDAO getResourceRoute(String url, String method) {
+ List l = _em.createQuery("select r from ResourceRouteDAOImpl r where
r._url = ?1 and r._method = ?2")
+ .setParameter(1, url).setParameter(2, method).getResultList();
+ if (l.size() == 0) return null;
+ ResourceRouteDAOImpl m = (ResourceRouteDAOImpl) l.get(0);
+ return m;
+ }
+
public EntityManager getEntityManager() {
return _em;
}
Modified:
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
(original)
+++
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
Wed Nov 26 18:01:27 2008
@@ -9,7 +9,7 @@
@Id @Column(name="ID")
@GeneratedValue(strategy= GenerationType.AUTO)
private Long _id;
- @Basic @Column(name="URL", length=255)
+ @Basic @Column(name="URL", length=255, unique=true)
private String _url;
@Basic @Column(name="METHOD", length=8)
private String _method;
@@ -47,19 +47,19 @@
_url = url;
}
- public String getChannelId() {
+ public String getPickResponseChannel() {
return _channelId;
}
- public void setChannelId(String channelId) {
+ public void setPickResponseChannel(String channelId) {
_channelId = channelId;
}
- public int getIndex() {
+ public int getSelectorIdx() {
return _index;
}
- public void setIndex(int index) {
+ public void setSelectorIdx(int index) {
_index = index;
}
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Wed Nov 26 18:01:27 2008
@@ -40,10 +40,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.*;
@@ -606,12 +603,30 @@
}
}
- public RESTMessageExchange createMessageExchange(Resource resource, String
foreignKey) throws BpelEngineException {
+ public RESTMessageExchange createMessageExchange(final Resource resource,
String foreignKey) throws BpelEngineException {
_mngmtLock.readLock().lock();
try {
- // Do stuff
ODERESTProcess target = _restServiceMap.get(resource.getUrl());
- if (target == null) throw new BpelEngineException("NoSuchResource:
" + resource.getUrl());
+ if (target == null) {
+ try {
+ QName processId = _contexts.execTransaction(new
Callable<QName>() {
+ public QName call() {
+ ResourceRouteDAO rr = _contexts.dao.getConnection()
+ .getResourceRoute(resource.getUrl(),
resource.getMethod());
+ if (rr == null) return null;
+ ProcessDAO processDao =
rr.getInstance().getProcess();
+ return processDao.getProcessId();
+ }
+ });
+ for (ODERESTProcess odeRestProcess :
_restServiceMap.values()) {
+ if (odeRestProcess._pid.equals(processId)) target =
odeRestProcess;
+ }
+ } catch (Exception e) {
+ throw new BpelEngineException(e);
+ }
+ }
+
+ if (target == null) throw new BpelEngineException("No such
resource: " + resource.getUrl());
assertNoTransaction();
return target.createRESTMessageExchange(resource, foreignKey);
} finally {
@@ -808,15 +823,12 @@
void scheduleRunnable(final Runnable runnable) {
assertTransaction();
_contexts.registerCommitSynchronizer(new Runnable() {
-
public void run() {
_exec.submit(new ServerRunnable(runnable));
}
});
-
}
-
protected void assertTransaction() {
if (!_contexts.isTransacted())
@@ -908,7 +920,6 @@
_lastTimeOfServerCallable.set(System.currentTimeMillis());
}
-
class ServerRunnable implements Runnable {
final Runnable _work;
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
Wed Nov 26 18:01:27 2008
@@ -93,7 +93,7 @@
_pid = conf.getProcessId();
_pconf = conf;
_contexts = server._contexts;
- _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.txManager);
+ _inMemDao = conf.isTransient() ?
((BpelDAOConnectionFactoryImpl)_contexts.dao) : new
BpelDAOConnectionFactoryImpl(_contexts.txManager);
_incomingMexCache = mexCache;
// TODO : do this on a per-partnerlink basis, support transacted
styles.
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
Wed Nov 26 18:01:27 2008
@@ -4,6 +4,7 @@
import org.apache.ode.bpel.rapi.ResourceModel;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ResourceRouteDAO;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
@@ -30,14 +31,17 @@
public Collection<String> getInitialResourceUrls() {
if (_staticResources.size() > 0 ) return _staticResources.values();
+ // Caching instantiating resource urls as those can be expressions
ArrayList<String> addresses = new ArrayList<String>();
for (ResourceModel resourceModel :
_processModel.getProvidedResources()) {
- try {
- String addr = _runtime.extractAddress(resourceModel);
- addresses.add(addr);
- _staticResources.put(resourceModel, addr);
- } catch (FaultException e) {
- throw new BpelEngineException(e);
+ if (resourceModel.isInstantiateResource()) {
+ try {
+ String addr = _runtime.extractAddress(resourceModel);
+ addresses.add(addr);
+ _staticResources.put(resourceModel, addr);
+ } catch (FaultException e) {
+ throw new BpelEngineException(e);
+ }
}
}
return addresses;
@@ -45,7 +49,8 @@
void activate() {
bounceProcessDAO();
-
+
+ // Activating instantiating resources
for (ResourceModel resourceModel : _staticResources.keySet()) {
Resource resource = new
Resource(_staticResources.get(resourceModel),
"application/xml", resourceModel.getMethod());
@@ -63,26 +68,27 @@
}
void invokeProcess(final MessageExchangeDAO mexdao) {
- Resource msgResource = getResource(mexdao.getResource());
- mexdao.setProcess(getProcessDAO());
-
if (_pconf.getState() == ProcessState.RETIRED) {
throw new InvalidProcessException("Process is retired.",
InvalidProcessException.RETIRED_CAUSE_CODE);
}
+ mexdao.setProcess(getProcessDAO());
- ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
- newInstance.setInstantiatingUrl(mexdao.getResource());
+ Resource instantiatingResource = getResource(mexdao.getResource());
+ InvocationStyle istyle = mexdao.getInvocationStyle();
- // send process instance event
- NewProcessInstanceEvent evt = new
NewProcessInstanceEvent(getProcessModel().getQName(),
- getProcessDAO().getProcessId(), newInstance.getInstanceId());
- evt.setMexId(mexdao.getMessageExchangeId());
- saveEvent(evt, newInstance);
+ if (instantiatingResource != null) {
+ ProcessInstanceDAO newInstance =
getProcessDAO().createInstance(null);
+ newInstance.setInstantiatingUrl(mexdao.getResource());
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new
NewProcessInstanceEvent(getProcessModel().getQName(),
+ getProcessDAO().getProcessId(),
newInstance.getInstanceId());
+ evt.setMexId(mexdao.getMessageExchangeId());
+ saveEvent(evt, newInstance);
-
mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
- mexdao.setInstance(newInstance);
+
mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+ mexdao.setInstance(newInstance);
- if (isInstantiating(msgResource)) {
doInstanceWork(mexdao.getInstance().getInstanceId(), new
Callable<Void>() {
public Void call() {
executeCreateInstance(mexdao);
@@ -90,7 +96,32 @@
}
});
} else {
- throw new UnsupportedOperationException("not yet");
+ // TODO avoid reloading the resource routing, it's just been
loaded by the server on mex creation
+ String[] urlMeth = mexdao.getResource().split("~");
+ ResourceRouteDAO rr =
_contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
+ // This really should have been caught by the server
+ if (rr == null) throw new BpelEngineException("NoSuchResource: " +
mexdao.getResource());
+ mexdao.setInstance(rr.getInstance());
+ mexdao.setChannel(rr.getPickResponseChannel() + "&" +
rr.getSelectorIdx());
+
+ if (istyle == InvocationStyle.TRANSACTED) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new
Callable<Void>() {
+ public Void call() {
+ executeContinueInstanceMyRoleRequestReceived(mexdao);
+ return null;
+ }
+ });
+ } else /* non-transacted style */ {
+ WorkEvent we = new WorkEvent();
+ we.setType(WorkEvent.Type.MYROLE_INVOKE);
+ we.setIID(mexdao.getInstance().getInstanceId());
+ we.setMexId(mexdao.getMessageExchangeId());
+ // Could be different to this pid when routing to an older
version
+
we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+
+ scheduleWorkEvent(we, null);
+ }
+
}
}
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
Wed Nov 26 18:01:27 2008
@@ -53,7 +53,6 @@
public Future<Status> invokeAsync() {
if (_future != null) return _future;
- if (_request == null) throw new IllegalStateException("Must call
setRequest(...)!");
_future = new ResponseFuture();
_process.enqueueTransaction(new Callable<Void>() {
@@ -73,7 +72,8 @@
if (getStatus() != Status.NEW) throw new
IllegalStateException("Invalid state: " + getStatus());
request();
- MessageExchangeDAO dao =
_process.createMessageExchange(getMessageExchangeId(),
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ MessageExchangeDAO dao =
_process.createMessageExchange(getMessageExchangeId(),
+ MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
if (__log.isDebugEnabled()) __log.debug("invoke() EPR= " + _epr + "
==> " + _process);
try {
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
Wed Nov 26 18:01:27 2008
@@ -18,15 +18,7 @@
*/
package org.apache.ode.bpel.memdao;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Status;
@@ -40,11 +32,7 @@
import org.apache.ode.bpel.common.Filter;
import org.apache.ode.bpel.common.InstanceFilter;
import org.apache.ode.bpel.common.ProcessFilter;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.utils.ISO8601DateParser;
import org.apache.ode.utils.stl.CollectionsX;
@@ -66,6 +54,8 @@
private final List<MessageExchangeDAOImpl> _mexList = new
LinkedList<MessageExchangeDAOImpl>();
private final Map<String, MessageExchangeDAOImpl> _mexStore = new
HashMap<String, MessageExchangeDAOImpl>();
+ private final HashMap<String,ResourceRouteDAOImpl> _resRouteStore = new
HashMap<String,ResourceRouteDAOImpl>();
+
private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
private static volatile long _lastRemoval = 0;
@@ -192,8 +182,6 @@
_mexStore.put(mexId, mex);
_mexList.add(mex);
}
-
-
cleanupDeadWood();
// Removing right away on rollback
@@ -201,17 +189,13 @@
public void run() {
synchronized (_mexStore) {
MessageExchangeDAOImpl mexdao = _mexStore.remove(mexId);
-
- if (mexdao != null)
- _mexList.remove(mexdao);
+ if (mexdao != null) _mexList.remove(mexdao);
}
}
});
return mex;
}
-
-
/**
* Remove old message exchanges from the Mex store.
@@ -247,6 +231,22 @@
}
}
+ public ResourceRouteDAO getResourceRoute(String url, String method) {
+ return _resRouteStore.get(url+"~"+method);
+ }
+
+ public void addResourceRoute(ResourceRouteDAOImpl rroute) {
+ _resRouteStore.put(rroute.getUrl()+"~"+rroute.getMethod(), rroute);
+ }
+
+ public void cleanupResourceRoutes(Long piid) {
+ Iterator<ResourceRouteDAOImpl> rrIter =
_resRouteStore.values().iterator();
+ while (rrIter.hasNext()) {
+ ResourceRouteDAOImpl rr = rrIter.next();
+ if (rr.getInstance().getInstanceId().equals(piid)) rrIter.remove();
+ }
+ }
+
private int compareInstanceUsingKey(String key, ProcessInstanceDAO
instanceDAO1, ProcessInstanceDAO instanceDAO2) {
String s1 = null;
String s2 = null;
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
Wed Nov 26 18:01:27 2008
@@ -148,11 +148,14 @@
__log.debug("Removing completed process instance " +
instance.getInstanceId() + " from in-memory store.");
_instancesAge.remove(instance.getInstanceId());
ProcessInstanceDAO removed =
_instances.remove(instance.getInstanceId());
+ _conn.cleanupResourceRoutes(instance.getInstanceId());
+
if (removed == null) {
// Checking for leftover instances that should be removed
ArrayList<Long> removals = new ArrayList<Long>(_instancesToRemove);
for (Long iid : removals) {
_instances.remove(iid);
+ _conn.cleanupResourceRoutes(iid);
}
_instancesToRemove.removeAll(removals);
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Wed Nov 26 18:01:27 2008
@@ -241,7 +241,9 @@
}
public void createResourceRoute(String url, String method, String
pickResponseChannel, int selectorIdx) {
- _resourceRoutes.put(url, new ResourceRouteDAOImpl(url, method,
pickResponseChannel, selectorIdx));
+ ResourceRouteDAOImpl rroute = new ResourceRouteDAOImpl(url, method,
pickResponseChannel, selectorIdx, this);
+ ((BpelDAOConnectionImpl)_conn).addResourceRoute(rroute);
+ _resourceRoutes.put(url, rroute);
}
/**
@@ -419,4 +421,8 @@
public void setExecutionStateCounter(int stateCounter) {
_execStateCount = stateCounter;
}
+
+ public Map<String, ResourceRouteDAO> geResourceRoutes() {
+ return _resourceRoutes;
+ }
}
Modified:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
(original)
+++
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
Wed Nov 26 18:01:27 2008
@@ -1,6 +1,7 @@
package org.apache.ode.bpel.memdao;
import org.apache.ode.bpel.dao.ResourceRouteDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
public class ResourceRouteDAOImpl extends DaoBaseImpl implements
ResourceRouteDAO {
private Long _id;
@@ -9,12 +10,16 @@
private String pickResponseChannel;
private int selectorIdx;
- public ResourceRouteDAOImpl(String url, String method, String
pickResponseChannel, int selectorIdx) {
+ private ProcessInstanceDaoImpl instance;
+
+ public ResourceRouteDAOImpl(String url, String method, String
pickResponseChannel,
+ int selectorIdx, ProcessInstanceDaoImpl
instance) {
_id = IdGen.newProcessId();
this.url = url;
this.method = method;
this.pickResponseChannel = pickResponseChannel;
this.selectorIdx = selectorIdx;
+ this.instance = instance;
}
public String getUrl() {
@@ -48,4 +53,8 @@
public void setSelectorIdx(int selectorIdx) {
this.selectorIdx = selectorIdx;
}
+
+ public ProcessInstanceDAO getInstance() {
+ return instance;
+ }
}
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
Wed Nov 26 18:01:27 2008
@@ -115,22 +115,27 @@
Selector selector;
try {
PickResponseChannel pickResponseChannel =
newChannel(PickResponseChannel.class);
- CorrelationKey key;
- PartnerLinkInstance pLinkInstance =
_scopeFrame.resolve(_oevent.partnerLink);
- if (_oevent.matchCorrelation == null) {
- // Adding a route for opaque correlation. In this case
correlation is done on "out-of-band" session id.
- String sessionId =
getBpelRuntime().fetchMySessionId(pLinkInstance);
- key = new CorrelationKey(-1, new String[] {sessionId});
+ if (_oevent.isRestful()) {
+
getBpelRuntime().checkResourceRoute(_scopeFrame.resolve(_oevent.resource),
+ _oevent.messageExchangeId, pickResponseChannel, 0);
} else {
- if
(!getBpelRuntime().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation)))
{
- throw new
FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation
not initialized.");
+ CorrelationKey key;
+ PartnerLinkInstance pLinkInstance =
_scopeFrame.resolve(_oevent.partnerLink);
+ if (_oevent.matchCorrelation == null) {
+ // Adding a route for opaque correlation. In this case
correlation is done on "out-of-band" session id.
+ String sessionId =
getBpelRuntime().fetchMySessionId(pLinkInstance);
+ key = new CorrelationKey(-1, new String[] {sessionId});
+ } else {
+ if
(!getBpelRuntime().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation)))
{
+ throw new
FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation
not initialized.");
+ }
+ key =
getBpelRuntime().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
+ assert key != null;
}
- key =
getBpelRuntime().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
- assert key != null;
- }
- selector = new
Selector(0,pLinkInstance,_oevent.operation.getName(),
_oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
- getBpelRuntime().select(pickResponseChannel, null, false, new
Selector[] { selector} );
+ selector = new
Selector(0,pLinkInstance,_oevent.operation.getName(),
_oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+ getBpelRuntime().select(pickResponseChannel, null, false,
new Selector[] { selector} );
+ }
instance(new WAITING(pickResponseChannel, _scopeFrame));
} catch(FaultException e){
__log.error(e);
@@ -218,14 +223,11 @@
mlset.add(new
PickResponseChannelListener(_pickResponseChannel) {
private static final long serialVersionUID =
-4929999153478677288L;
-
public void onRequestRcvd(int selectorIdx, String
mexId) {
// The receipt of the message causes a new scope
to be created:
ScopeFrame ehScopeFrame = new ScopeFrame(_oevent,
getBpelRuntime().createScopeInstance(_scopeFrame.scopeInstanceId, _oevent),
- _scopeFrame,
- _comps,
- _fault);
+ _scopeFrame, _comps, _fault);
if (_oevent.variable != null) {
Element msgEl =
getBpelRuntime().getMyRequest(mexId);
@@ -254,7 +256,7 @@
initializeCorrelation(ehScopeFrame.resolve(cset),
ehScopeFrame.resolve(_oevent.variable));
}
- if (_oevent.partnerLink.hasPartnerRole()) {
+ if (_oevent.partnerLink != null &&
_oevent.partnerLink.hasPartnerRole()) {
// Trying to initialize partner epr based
on a message-provided epr/session.
if
(!getBpelRuntime().isPartnerRoleEndpointInitialized(ehScopeFrame
.resolve(_oevent.partnerLink)) ||
!_oevent.partnerLink.initializePartnerRole) {
@@ -271,9 +273,6 @@
partnersSessionId);
}
-
-
-
} catch (FaultException e) {
__log.error(e);
if (_fault == null) {
@@ -284,15 +283,11 @@
return;
}
-
-
// load 'onMessage' activity; we'll do this even
if a stop/terminate has been
// requested becasue we cannot undo the receipt of
the message at this point.
- ActivityInfo child = new
ActivityInfo(genMonotonic(),
- _oevent.activity,
+ ActivityInfo child = new
ActivityInfo(genMonotonic(), _oevent.activity,
newChannel(TerminationChannel.class),
newChannel(ParentScopeChannel.class));
-
_active.add(child);
LinkFrame lf = new LinkFrame(null);
@@ -301,9 +296,7 @@
// If we previously terminated the other
activiites, then we do the same
// here; this is easier then undoing the receive.
- if (_childrenTerminated)
- replication(child.self).terminate();
-
+ if (_childrenTerminated)
replication(child.self).terminate();
if (_terminated || _stopped || _fault != null)
instance(new WAITING(null, _scopeFrame));
Modified:
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/SCOPE.java
URL:
http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/SCOPE.java?rev=721066&r1=721065&r2=721066&view=diff
==============================================================================
---
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/SCOPE.java
(original)
+++
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/SCOPE.java
Wed Nov 26 18:01:27 2008
@@ -91,28 +91,46 @@
getBpelRuntime().initializePartnerLinks(_scopeFrame.scopeInstanceId,
_oscope.partnerLinks.values());
- // Initializing resource values
- for (Map.Entry<String,OResource> resource :
_oscope.resource.entrySet()) {
+ initializeResources();
+
+ sendEvent(new ScopeStartEvent());
+ instance(new ACTIVE());
+ }
+
+ private void initializeResources() {
+ // Filter instantiating resource to handle it first
+ ArrayList<OResource> resources = new ArrayList<OResource>();
+ OResource instantiating = null;
+ for (OResource resource : _oscope.resource.values()) {
+ if (resource.isInstantiateResource()) instantiating = resource;
+ else resources.add(resource);
+ }
+
+ if (instantiating != null) {
try {
String url =
getBpelRuntime().getExpLangRuntime().evaluateAsString(
- resource.getValue().getSubpath(),
getEvaluationContext());
- // TODO implement a better URL building heuristic
- if (resource.getValue().isInstantiateResource()) {
- url = url + "/" + getBpelRuntime().getInstanceId();
- getBpelRuntime().initializeInstantiatingUrl(url);
- } else {
- url = getBpelRuntime().getInstantiatingUrl() + "/" + url;
- }
-
-
getBpelRuntime().initializeResource(_scopeFrame.scopeInstanceId,
resource.getValue(), url);
+ instantiating.getSubpath(), getEvaluationContext());
+ url = url + "/" + getBpelRuntime().getInstanceId();
+ getBpelRuntime().initializeInstantiatingUrl(url);
+
getBpelRuntime().initializeResource(_scopeFrame.scopeInstanceId, instantiating,
url);
} catch (FaultException e) {
- _self.parent.completed(new FaultData(e.getQName(),
resource.getValue(),
+ _self.parent.completed(new FaultData(e.getQName(),
instantiating,
"Error in resource evaluation: " + e.toString()),
CompensationHandler.emptySet());
}
}
- sendEvent(new ScopeStartEvent());
- instance(new ACTIVE());
+ for (OResource resource : resources) {
+ try {
+ String url =
getBpelRuntime().getExpLangRuntime().evaluateAsString(
+ resource.getSubpath(), getEvaluationContext());
+ url = getBpelRuntime().getInstantiatingUrl() + url;
+
+
getBpelRuntime().initializeResource(_scopeFrame.scopeInstanceId, resource, url);
+ } catch (FaultException e) {
+ _self.parent.completed(new FaultData(e.getQName(), resource,
+ "Error in resource evaluation: " + e.toString()),
CompensationHandler.emptySet());
+ }
+ }
}
private List<CompensationHandler> findCompensationData(OScope scope) {