ODE-974: Sort the processes so that the active one comes last, changed 
findInstance query to search based on the process type.


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/e11afe7b
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/e11afe7b
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/e11afe7b

Branch: refs/heads/master
Commit: e11afe7b4df9652ac828749f0597abad8c75a387
Parents: d4c3aa7
Author: sathwik <[email protected]>
Authored: Wed Jun 28 15:41:36 2017 +0530
Committer: sathwik <[email protected]>
Committed: Wed Jun 28 15:41:36 2017 +0530

----------------------------------------------------------------------
 .../org/apache/ode/bpel/dao/ProcessDAO.java     |  2 +-
 .../apache/ode/bpel/engine/BpelEngineImpl.java  | 16 ++++++++++++++++
 .../org/apache/ode/bpel/engine/BpelProcess.java | 20 +++++++++++++-------
 .../apache/ode/daohib/bpel/ProcessDaoImpl.java  |  7 +++++--
 .../ode/daohib/bpel/hobj/HCorrelationSet.java   |  2 +-
 .../org/apache/ode/dao/jpa/ProcessDAOImpl.java  | 10 ++++++----
 6 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java 
b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
index 6563524..c3c16b7 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
@@ -121,7 +121,7 @@ public interface ProcessDAO {
 
 
     /**
-     * Locates process instances for a specific process version that matches 
correlation key and instance state
+     * Find process instances that matches correlation key and instance state
      * @param ckey
      *          Correlation key
      * @param processInstanceState

http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 03e9ee8..132ff66 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -61,6 +61,8 @@ import javax.wsdl.Operation;
 import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -516,6 +518,20 @@ public class BpelEngineImpl implements BpelEngine {
                 }
                 if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == 
JobType.MEX_MATCHER) {
                     List<BpelProcess> processes = 
getAllProcesses(we.getProcessId());
+
+                    if(processes.size() > 1) {
+                        //sort the processes so that the active process is at 
end of the list, required for enqueueing an early message
+                        Comparator<BpelProcess> compartor = new 
Comparator<BpelProcess>(){
+                            @Override
+                            public int compare(BpelProcess o1, BpelProcess o2) 
{
+                                if ( o1.isActive() && !o2.isActive() ) return 
1;
+                                if ( !o1.isActive() && o2.isActive() ) return 
-1;
+                                return 0;
+                            }
+                        };
+                        Collections.sort(processes, compartor);
+                    }
+
                     boolean routed = false;
 
                     //try to find the target process and execute

http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index e9ad33f..fad7e86 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -42,6 +42,7 @@ import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.common.ProcessState;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -324,11 +325,9 @@ public class BpelProcess {
                         for( Iterator<CorrelationKey> keyItr = 
routing.wholeKeySet.iterator();
                                 (keyItr.hasNext() && !routed);) {
                             CorrelationKey key = keyItr.next();
-                            __log.info("noRoutingMatch: Finding active 
instance correlated with {} and process pid {}",key,_pid);
+                            __log.debug("noRoutingMatch: Finding active 
instance correlated with {}",key);
 
-                            // We need to make sure the PID of process of the 
instance is same as that of the
-                            // partnerlink's associated process in the 
iteration. Otherwise we might end up
-                            // associating wrong correlator with the mex.
+                            //Find instances across all version of a process 
that match the correlation key and instance state.
                             Collection<ProcessInstanceDAO> instanceDaoList = 
getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE);
 
                             if (!(instanceDaoList.isEmpty())) {
@@ -345,10 +344,17 @@ public class BpelProcess {
                         if(!(intersectionInstanceSet.isEmpty())) {
                             ProcessInstanceDAO instance = 
intersectionInstanceSet.iterator().next();
                             mex.getDAO().setProcess(instance.getProcess());
-                            target.noRoutingMatch(mex, routing);
+
+                            // correlator should be the one obtained from the 
instance's process itself, as the findInstance query might return 
+                            // instances of other process version.
+                            CorrelatorDAO correlator = 
instance.getProcess().getCorrelator(routing.correlator.getCorrelatorId());
+                            RoutingInfo matchedRouting = new 
RoutingInfo(routing.messageRoute,routing.matchedKeySet,correlator,routing.wholeKeySet);
+
+                            target.noRoutingMatch(mex, matchedRouting);
                             routed = true;
 
-                            __log.info("noRoutingMatch: Active instance found 
instanceID: {} and process pid {}",instance.getInstanceId(),_pid);
+                            __log.info("noRoutingMatch: Active instance found 
instanceID: {} and process pid 
{}",instance.getInstanceId(),instance.getProcess().getType());
+                         }
                         }
                     }
                 }
@@ -357,7 +363,7 @@ public class BpelProcess {
         return routed;
      }
 
-    private boolean isActive() {
+    protected boolean isActive() {
         return _pconf.getState() == 
org.apache.ode.bpel.iapi.ProcessState.ACTIVE;
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
index 84b0e4b..448b783 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
@@ -311,12 +311,15 @@ public class ProcessDaoImpl extends HibernateDao 
implements ProcessDAO, Deferred
         return _process.getGuid();
     }
 
-    @Override
+    /**
+     * Find instances across all versions of a process that match the 
correlation key and instance state.
+     */
     public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, 
short processInstanceState) {
         entering("ProcessDaoImpl.findInstance");
         Query qry = 
getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS);
         qry.setParameter("ckey", ckey.toCanonicalString());
-        qry.setEntity("process", getHibernateObj());
+        qry.setString("processTypeNamespace", _process.getTypeNamespace());
+        qry.setString("processTypeName", _process.getTypeName());
         qry.setShort("state", processInstanceState);
         Collection<HProcessInstance> resultList = qry.list();
 

http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
----------------------------------------------------------------------
diff --git 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
index 1656989..8fd7673 100644
--- 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
+++ 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
@@ -30,7 +30,7 @@ import java.util.Collection;
  * @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from 
HCorrelationSet as c left join fetch c.process left join fetch c.instance where 
c.instance.state in (:states)"
  * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select 
cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey"
  * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" 
query="select cs.scope.instance from HCorrelationSet as cs where cs.value = 
:ckey and
- *                          cs.process = :process and cs.instance.state = 
:state"
+ *                          cs.process.typeNamespace = :processTypeNamespace 
and cs.process.typeName = :processTypeName and cs.instance.state = :state"
  */
 public class HCorrelationSet extends HObject{
     public static final String SELECT_CORSET_IDS_BY_INSTANCES = 
"SELECT_CORSET_IDS_BY_INSTANCES";

http://git-wip-us.apache.org/repos/asf/ode/blob/e11afe7b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java 
b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
index f38cd0a..c6900f6 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
@@ -46,7 +46,7 @@ import java.util.List;
     @NamedQuery(name="InstanceByCKey", query="select 
cs._scope._processInstance from CorrelationSetDAOImpl as cs where 
cs._correlationKey = :ckey"),
     @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl 
as c where c._correlatorKey = :ckey and c._process = :process"),
     @NamedQuery(name="InstanceByCKeyProcessState", query="select 
cs._scope._processInstance from CorrelationSetDAOImpl as cs where 
cs._correlationKey = :ckey and "
-            + "cs._scope._processInstance._process = :process and 
cs._scope._processInstance._state = :state")
+            + "cs._scope._processInstance._process._processType = :processType 
and cs._scope._processInstance._state = :state")
 })
 public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
     private static final Logger __log = 
LoggerFactory.getLogger(ProcessDAOImpl.class);
@@ -229,13 +229,15 @@ public class ProcessDAOImpl extends OpenJPADAO implements 
ProcessDAO {
         return qry.getResultList();
     }
 
-    @Override
+    /**
+     * Find instances across all versions of a process that match the 
correlation key and instance state.
+     */
     public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, 
short processInstanceState) {
         //need to make this query more efficient
         Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState");
         qry.setParameter("ckey", ckey.toCanonicalString());
-        qry.setParameter("process", this);
-        qry.setParameter("state", ProcessState.STATE_ACTIVE);
+        qry.setParameter("processType", getType().toString());
+        qry.setParameter("state", processInstanceState);
         return qry.getResultList();
     }
 }

Reply via email to