ODE-974: Sort the processes so that the active one comes last, changed findInstance query to search based on the process type.
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/e11afe7b Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/e11afe7b Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/e11afe7b Branch: refs/heads/master Commit: e11afe7b4df9652ac828749f0597abad8c75a387 Parents: d4c3aa7 Author: sathwik <[email protected]> Authored: Wed Jun 28 15:41:36 2017 +0530 Committer: sathwik <[email protected]> Committed: Wed Jun 28 15:41:36 2017 +0530 ---------------------------------------------------------------------- .../org/apache/ode/bpel/dao/ProcessDAO.java | 2 +- .../apache/ode/bpel/engine/BpelEngineImpl.java | 16 ++++++++++++++++ .../org/apache/ode/bpel/engine/BpelProcess.java | 20 +++++++++++++------- .../apache/ode/daohib/bpel/ProcessDaoImpl.java | 7 +++++-- .../ode/daohib/bpel/hobj/HCorrelationSet.java | 2 +- .../org/apache/ode/dao/jpa/ProcessDAOImpl.java | 10 ++++++---- 6 files changed, 42 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java index 6563524..c3c16b7 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java @@ -121,7 +121,7 @@ public interface ProcessDAO { /** - * Locates process instances for a specific process version that matches correlation key and instance state + * Find process instances that matches correlation key and instance state * @param ckey * Correlation key * @param processInstanceState http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index 03e9ee8..132ff66 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -61,6 +61,8 @@ import javax.wsdl.Operation; import javax.wsdl.PortType; import javax.xml.namespace.QName; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -516,6 +518,20 @@ public class BpelEngineImpl implements BpelEngine { } if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) { List<BpelProcess> processes = getAllProcesses(we.getProcessId()); + + if(processes.size() > 1) { + //sort the processes so that the active process is at end of the list, required for enqueueing an early message + Comparator<BpelProcess> compartor = new Comparator<BpelProcess>(){ + @Override + public int compare(BpelProcess o1, BpelProcess o2) { + if ( o1.isActive() && !o2.isActive() ) return 1; + if ( !o1.isActive() && o2.isActive() ) return -1; + return 0; + } + }; + Collections.sort(processes, compartor); + } + boolean routed = false; //try to find the target process and execute http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java index e9ad33f..fad7e86 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java @@ -42,6 +42,7 @@ import org.apache.ode.bpel.common.CorrelationKey; import org.apache.ode.bpel.common.FaultException; import org.apache.ode.bpel.common.ProcessState; import org.apache.ode.bpel.dao.BpelDAOConnection; +import org.apache.ode.bpel.dao.CorrelatorDAO; import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; @@ -324,11 +325,9 @@ public class BpelProcess { for( Iterator<CorrelationKey> keyItr = routing.wholeKeySet.iterator(); (keyItr.hasNext() && !routed);) { CorrelationKey key = keyItr.next(); - __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",key,_pid); + __log.debug("noRoutingMatch: Finding active instance correlated with {}",key); - // We need to make sure the PID of process of the instance is same as that of the - // partnerlink's associated process in the iteration. Otherwise we might end up - // associating wrong correlator with the mex. + //Find instances across all version of a process that match the correlation key and instance state. Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE); if (!(instanceDaoList.isEmpty())) { @@ -345,10 +344,17 @@ public class BpelProcess { if(!(intersectionInstanceSet.isEmpty())) { ProcessInstanceDAO instance = intersectionInstanceSet.iterator().next(); mex.getDAO().setProcess(instance.getProcess()); - target.noRoutingMatch(mex, routing); + + // correlator should be the one obtained from the instance's process itself, as the findInstance query might return + // instances of other process version. + CorrelatorDAO correlator = instance.getProcess().getCorrelator(routing.correlator.getCorrelatorId()); + RoutingInfo matchedRouting = new RoutingInfo(routing.messageRoute,routing.matchedKeySet,correlator,routing.wholeKeySet); + + target.noRoutingMatch(mex, matchedRouting); routed = true; - __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),_pid); + __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),instance.getProcess().getType()); + } } } } @@ -357,7 +363,7 @@ public class BpelProcess { return routed; } - private boolean isActive() { + protected boolean isActive() { return _pconf.getState() == org.apache.ode.bpel.iapi.ProcessState.ACTIVE; } http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java index 84b0e4b..448b783 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java @@ -311,12 +311,15 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred return _process.getGuid(); } - @Override + /** + * Find instances across all versions of a process that match the correlation key and instance state. + */ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { entering("ProcessDaoImpl.findInstance"); Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS); qry.setParameter("ckey", ckey.toCanonicalString()); - qry.setEntity("process", getHibernateObj()); + qry.setString("processTypeNamespace", _process.getTypeNamespace()); + qry.setString("processTypeName", _process.getTypeName()); qry.setShort("state", processInstanceState); Collection<HProcessInstance> resultList = qry.list(); http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java index 1656989..8fd7673 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java @@ -30,7 +30,7 @@ import java.util.Collection; * @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from HCorrelationSet as c left join fetch c.process left join fetch c.instance where c.instance.state in (:states)" * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey" * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey and - * cs.process = :process and cs.instance.state = :state" + * cs.process.typeNamespace = :processTypeNamespace and cs.process.typeName = :processTypeName and cs.instance.state = :state" */ public class HCorrelationSet extends HObject{ public static final String SELECT_CORSET_IDS_BY_INSTANCES = "SELECT_CORSET_IDS_BY_INSTANCES"; http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java index f38cd0a..c6900f6 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java @@ -46,7 +46,7 @@ import java.util.List; @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"), @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process"), @NamedQuery(name="InstanceByCKeyProcessState", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey and " - + "cs._scope._processInstance._process = :process and cs._scope._processInstance._state = :state") + + "cs._scope._processInstance._process._processType = :processType and cs._scope._processInstance._state = :state") }) public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { private static final Logger __log = LoggerFactory.getLogger(ProcessDAOImpl.class); @@ -229,13 +229,15 @@ public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { return qry.getResultList(); } - @Override + /** + * Find instances across all versions of a process that match the correlation key and instance state. + */ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { //need to make this query more efficient Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState"); qry.setParameter("ckey", ckey.toCanonicalString()); - qry.setParameter("process", this); - qry.setParameter("state", ProcessState.STATE_ACTIVE); + qry.setParameter("processType", getType().toString()); + qry.setParameter("state", processInstanceState); return qry.getResultList(); } }
