Repository: ode Updated Branches: refs/heads/ode-1.3.x 58b1a735d -> 97c797240
ODE-974: Sort the processes so that the active one comes last, changed findInstance query to search based on the process type. Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/51d58ddd Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/51d58ddd Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/51d58ddd Branch: refs/heads/ode-1.3.x Commit: 51d58ddd70dca2fdd7c28d757780134085964ac2 Parents: 58b1a73 Author: sathwik <[email protected]> Authored: Wed Mar 16 11:47:17 2016 +0530 Committer: sathwik <[email protected]> Committed: Wed Mar 16 11:47:17 2016 +0530 ---------------------------------------------------------------------- .../org/apache/ode/bpel/dao/ProcessDAO.java | 2 +- .../apache/ode/bpel/engine/BpelEngineImpl.java | 18 +++++++++++++++++ .../org/apache/ode/bpel/engine/BpelProcess.java | 21 ++++++++++++-------- .../apache/ode/daohib/bpel/ProcessDaoImpl.java | 7 +++++-- .../ode/daohib/bpel/hobj/HCorrelationSet.java | 2 +- .../org/apache/ode/dao/jpa/ProcessDAOImpl.java | 10 ++++++---- 6 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java index c68ac53..584cecd 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java @@ -120,7 +120,7 @@ public interface ProcessDAO { Collection<ProcessInstanceDAO> getActiveInstances(); /** - * Locates process instances for a specific process version that matches correlation key and instance state + * Find process instances that matches correlation key and instance state * @param ckey * Correlation key * @param processInstanceState http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index 63d829e..5e49690 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -20,6 +20,8 @@ package org.apache.ode.bpel.engine; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -69,6 +71,8 @@ import org.apache.ode.utils.msg.MessageBundle; import org.w3c.dom.Document; import org.w3c.dom.Element; + + /** * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a * transaction. @@ -482,6 +486,20 @@ public class BpelEngineImpl implements BpelEngine { if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) { List<BpelProcess> processes = getAllProcesses(we.getProcessId()); + + if(processes.size() > 1) { + //sort the processes so that the active process is at end of the list, required for enqueueing an early message + Comparator<BpelProcess> compartor = new Comparator<BpelProcess>(){ + @Override + public int compare(BpelProcess o1, BpelProcess o2) { + if ( o1.isActive() && !o2.isActive() ) return 1; + if ( !o1.isActive() && o2.isActive() ) return -1; + return 0; + } + }; + Collections.sort(processes, compartor); + } + boolean routed = false; //try to find the target process and execute http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java index 606b73b..0704901 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java @@ -43,6 +43,7 @@ import org.apache.ode.bpel.common.CorrelationKey; import org.apache.ode.bpel.common.FaultException; import org.apache.ode.bpel.common.ProcessState; import org.apache.ode.bpel.dao.BpelDAOConnection; +import org.apache.ode.bpel.dao.CorrelatorDAO; import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; @@ -313,7 +314,7 @@ public class BpelProcess { target.noRoutingMatch(mex, routing); routed = true; - __log.info("noRoutingMatch: mex has been registered for a future Instance and process pid {}",_pid); + __log.debug("noRoutingMatch: mex has been registered for a future Instance of process pid {}",_pid); return routed; } @@ -323,11 +324,9 @@ public class BpelProcess { (keyItr.hasNext() && !routed);) { CorrelationKey key = keyItr.next(); - __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",key,_pid); + __log.debug("noRoutingMatch: Finding active instance correlated with {}",key); - // We need to make sure the PID of process of the instance is same as that of the - // partnerlink's associated process in the iteration. Otherwise we might end up - // associating wrong correlator with the mex. + //Find instances across all version of a process that match the correlation key and instance state. Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE); if (!(instanceDaoList.isEmpty())) { @@ -344,10 +343,16 @@ public class BpelProcess { if(!(intersectionInstanceSet.isEmpty())) { ProcessInstanceDAO instance = intersectionInstanceSet.iterator().next(); mex.getDAO().setProcess(instance.getProcess()); - target.noRoutingMatch(mex, routing); + + // correlator should be the one obtained from the instance's process itself, as the findInstance query might return + // instances of other process version. + CorrelatorDAO correlator = instance.getProcess().getCorrelator(routing.correlator.getCorrelatorId()); + RoutingInfo matchedRouting = new RoutingInfo(routing.messageRoute,routing.matchedKeySet,correlator,routing.wholeKeySet); + + target.noRoutingMatch(mex, matchedRouting); routed = true; - __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),_pid); + __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),instance.getProcess().getType()); } } } @@ -356,7 +361,7 @@ public class BpelProcess { return routed; } - private boolean isActive() { + protected boolean isActive() { return _pconf.getState() == org.apache.ode.bpel.iapi.ProcessState.ACTIVE; } http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java index 8d45610..d3345fb 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java @@ -307,12 +307,15 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred return _process.getGuid(); } - @Override + /** + * Find instances across all versions of a process that match the correlation key and instance state. + */ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { entering("ProcessDaoImpl.findInstance"); Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS); qry.setParameter("ckey", ckey.toCanonicalString()); - qry.setEntity("process", getHibernateObj()); + qry.setString("processTypeNamespace", _process.getTypeNamespace()); + qry.setString("processTypeName", _process.getTypeName()); qry.setShort("state", processInstanceState); Collection<HProcessInstance> resultList = qry.list(); http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java index 1656989..8fd7673 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java @@ -30,7 +30,7 @@ import java.util.Collection; * @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from HCorrelationSet as c left join fetch c.process left join fetch c.instance where c.instance.state in (:states)" * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey" * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey and - * cs.process = :process and cs.instance.state = :state" + * cs.process.typeNamespace = :processTypeNamespace and cs.process.typeName = :processTypeName and cs.instance.state = :state" */ public class HCorrelationSet extends HObject{ public static final String SELECT_CORSET_IDS_BY_INSTANCES = "SELECT_CORSET_IDS_BY_INSTANCES"; http://git-wip-us.apache.org/repos/asf/ode/blob/51d58ddd/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java index c109fda..1e28662 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java @@ -46,7 +46,7 @@ import java.util.List; @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"), @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process"), @NamedQuery(name="InstanceByCKeyProcessState", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey and " - + "cs._scope._processInstance._process = :process and cs._scope._processInstance._state = :state") + + "cs._scope._processInstance._process._processType = :processType and cs._scope._processInstance._state = :state") }) public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { private static final Logger __log = LoggerFactory.getLogger(ProcessDAOImpl.class); @@ -229,13 +229,15 @@ public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { return qry.getResultList(); } - @Override + /** + * Find instances across all versions of a process that match the correlation key and instance state. + */ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { //need to make this query more efficient Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState"); qry.setParameter("ckey", ckey.toCanonicalString()); - qry.setParameter("process", this); - qry.setParameter("state", ProcessState.STATE_ACTIVE); + qry.setParameter("processType", getType().toString()); + qry.setParameter("state", processInstanceState); return qry.getResultList(); } }
