Repository: ode
Updated Branches:
  refs/heads/master b85e5f93e -> 592e617cd


ODE-1049: Optimised findRoute and dequeueMessage queries to use pre 
inistialized CorrelationKeySet cannonical value.


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/d4c3aa74
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/d4c3aa74
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/d4c3aa74

Branch: refs/heads/master
Commit: d4c3aa74016936f3b829d9ca6fdd872b261b76eb
Parents: 24d7f78
Author: sathwik <[email protected]>
Authored: Wed Jun 28 15:05:28 2017 +0530
Committer: sathwik <[email protected]>
Committed: Wed Jun 28 15:05:28 2017 +0530

----------------------------------------------------------------------
 ODE-1049.patch                                  | 379 +++++++++++++++++++
 .../org/apache/ode/bpel/dao/CorrelatorDAO.java  |  15 +
 .../ode/bpel/engine/BpelRuntimeContextImpl.java |   3 +-
 .../ode/bpel/memdao/CorrelatorDaoImpl.java      |  50 +--
 .../ode/daohib/bpel/CorrelatorDaoImpl.java      | 103 ++---
 .../apache/ode/dao/jpa/CorrelatorDAOImpl.java   |  78 ++--
 6 files changed, 519 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/ODE-1049.patch
----------------------------------------------------------------------
diff --git a/ODE-1049.patch b/ODE-1049.patch
new file mode 100644
index 0000000..7f790ae
--- /dev/null
+++ b/ODE-1049.patch
@@ -0,0 +1,379 @@
+commit 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93
+Author: sathwik <[email protected]>
+Date:   Sat Mar 5 12:12:04 2016 +0530
+
+    ODE-1049: Optimised findRoute and dequeueMessage queries to use pre 
inistialized CorrelationKeySet cannonical value.
+
+diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java 
b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
+index 7a2556c0d..21cd8eb87 100644
+--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
++++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
+@@ -83,9 +83,11 @@ public interface CorrelatorDAO {
+     Collection<CorrelatorMessageDAO> getAllMessages();
+ 
+   /**
++   * @deprecated
+    * Find a route matching the given correlation key.
+    * @param correlationKey correlation key
+    * @return route matching the given correlation key
++   * @see findRoute(CorrelationKeySet correlationKeySet,boolean 
isCorrleationKeySetPreInitialized)
+    */
+   List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet);
+ 
+@@ -115,4 +117,17 @@ public interface CorrelatorDAO {
+      * @return all routes registered on this correlator, use with care as it 
can potentially return a lot of values
+      */
+     Collection<MessageRouteDAO> getAllRoutes();
++
++    /**
++     * Find a route matching the given correlation key set.
++     * If the correlationKeySet is known to be pre initialized then 
isCorrleationKeySetPreInitialized can be set to true or false otherwise.
++     * Depending on the value of isCorrleationKeySetPreInitialized,
++     *  true -  canonical value of the correlationKeySet might be used to 
find the route.
++     *  false - canonical value of each of the subset of correlationKeySet 
might be used to find the route.
++     *
++     * @param correlationKeySet correlation key
++     * @param isCorrleationKeySetPreInitialized
++     * @return route matching the given correlation key
++     */
++    List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized);
+ }
+diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+index aff7733f4..480094e13 100644
+--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
++++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+@@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements 
BpelRuntimeContext {
+         if (BpelProcess.__log.isDebugEnabled()) {
+             __log.debug("MatcherEvent handling: correlatorId=" + correlatorId 
+ ", ckeySet=" + ckeySet);
+         }
++
+         CorrelatorDAO correlator = 
_dao.getProcess().getCorrelator(correlatorId);
+ 
++
+         // Find the route first, this is a SELECT FOR UPDATE on the 
"selector" row,
+         // So we want to acquire the lock before we do anthing else.
+-        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet);
++        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true);
+         if (mroutes == null || mroutes.size() == 0) {
+             // Ok, this means that a message arrived before we did, so 
nothing to do.
+             __log.debug("MatcherEvent handling: nothing to do, route no 
longer in DB");
+@@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements 
BpelRuntimeContext {
+ 
+         // Now see if there is a message that matches this selector.
+         MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet);
++
+         if (mexdao != null) {
+             __log.debug("MatcherEvent handling: found matching message in DB 
(i.e. message arrived before <receive>)");
+-              if( 
MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) 
{
+-                      __log.warn("A message arrived before a receive is ready 
for a request/response pattern. This may be processed to success. However, you 
should consider revising your process since a TCP port and a container thread 
will be held for a longer time and the process will not scale under heavy 
load.");
+-              }
+-              
++            if( 
MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) 
{
++                __log.warn("A message arrived before a receive is ready for a 
request/response pattern. This may be processed to success. However, you should 
consider revising your process since a TCP port and a container thread will be 
held for a longer time and the process will not scale under heavy load.");
++            }
++
+             for (MessageRouteDAO mroute : mroutes) {
++                __log.debug("Removing routes for GroupID: {} Instance: 
{}",mroute.getGroupId(),_dao.getInstanceId());
+                 // We have a match, so we can get rid of the routing entries.
+                 correlator.removeRoutes(mroute.getGroupId(), _dao);
+             }
+diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
+index 5de76f2f8..f6261d47f 100644
+--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
++++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
+@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements 
CorrelatorDAO {
+     }
+ 
+     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
+-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+-
+-        assert keySet != null;
+-
+-        if (__log.isDebugEnabled()) {
+-            __log.debug("findRoute: keySet=" + keySet);
+-        }
+-        boolean routed = false;
+-        for (MessageRouteDaoImpl route : _routes) {
+-            assert route._ckeySet != null;
+-            
+-            if(keySet.isRoutableTo(route._ckeySet, 
"all".equals(route.getRoute()))) {
+-              if ("all".equals(route.getRoute()))  {
+-                      routes.add(route);
+-              } else {
+-                      if (!routed) {
+-                              routes.add(route);
+-                      }
+-                      routed = true;
+-              }
+-            }
+-        }
+-
+-        return routes;
++        return findRoute(keySet, false);
+     }
+ 
+     public String getCorrelatorId() {
+@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements 
CorrelatorDAO {
+         return true;
+     }
+ 
++    public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
++        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
++
++        assert correlationKeySet != null;
++        boolean routed = false;
++
++        __log.debug("findRoute: keySet={}",correlationKeySet);
++
++        for (MessageRouteDaoImpl route : _routes) {
++            assert route._ckeySet != null;
++
++            if(correlationKeySet.isRoutableTo(route._ckeySet, 
"all".equals(route.getRoute()))) {
++                if ("all".equals(route.getRoute()))  {
++                    routes.add(route);
++                } else {
++                    if (!routed) {
++                        routes.add(route);
++                    }
++                    routed = true;
++                }
++            }
++        }
++
++        return routes;
++    }
+ }
+diff --git 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+index 3f530cf4a..a467c2db1 100644
+--- 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
++++ 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
+             "(select hc.id from HCorrelator as hc where hc.correlatorId = 
:correlatorId)").intern();
+ 
+     /** Query for removing routes. */
+-    private static final String QRY_DELSELECTORS = "delete from 
HCorrelatorSelector where groupId = ? and instance = ?";
++    private static final String QRY_DELSELECTORS = "delete from 
HCorrelatorSelector where instance = ? and groupId = ?";
+ 
+     private HCorrelator _hobj;
+ 
+@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
+ 
+         __log.debug("dequeueMessage({}): ",keySet);
+ 
+-        List<CorrelationKeySet> subSets = keySet.findSubSets();
+-        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+-                generateUnmatchedQuery(subSets));
+-        for( int i = 0; i < subSets.size(); i++ ) {
+-            qry.setString("s" + i, subSets.get(i).toCanonicalString());
+-        }
++        Query qry = 
getSession().createFilter(_hobj.getMessageCorrelations()," where 
this.correlationKey = :s0");
++        qry.setString("s0", keySet.toCanonicalString());
+ 
+         // We really should consider the possibility of multiple messages 
matching a criteria.
+         // When the message is handled, its not too convenient to attempt to 
determine if the
+@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
+ 
+     @SuppressWarnings("unchecked")
+     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
+-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+-
+-        entering("CorrelatorDaoImpl.findRoute");
+-        String hdr = "findRoute(keySet=" + keySet + "): ";
+-        if (__log.isDebugEnabled()) __log.debug(hdr);
+-
+-        //String processType = new 
QName(_hobj.getProcess().getTypeNamespace(), 
_hobj.getProcess().getTypeName()).toString();
+-        List<CorrelationKeySet> subSets = keySet.findSubSets();
+-
+-        //Query q = 
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? 
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+-        Query q = 
getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+-        q.setEntity("correlator", getHibernateObj());
+-
+-        for( int i = 0; i < subSets.size(); i++ ) {
+-            q.setString("s" + i, subSets.get(i).toCanonicalString());
+-        }
+-        // Make sure we obtain a lock for the selector we want to find.
+-        q.setLockMode("hs", LockMode.UPGRADE);
+-
+-        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+-        List<HCorrelatorSelector> list;
+-        try {
+-            list = (List<HCorrelatorSelector>) q.list();
+-        } catch (LockAcquisitionException e) {
+-            throw new Scheduler.JobProcessorException(e, true);
+-        }
+-        for (HCorrelatorSelector selector : list) {
+-            if (selector != null) {
+-                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
+-                if ("all".equals(selector.getRoute()) ||
+-                        (isRoutePolicyOne && 
!targets.contains(selector.getInstance()))) {
+-                    routes.add(new MessageRouteDaoImpl(_sm, selector));
+-                    targets.add(selector.getInstance());
+-                }
+-            }
+-        }
+-
+-        if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
+-
+-        return routes;
++        return findRoute(keySet,false);
+     }
+ 
+     private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
+@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
+         __log.debug(hdr);
+         Session session = getSession();
+         Query q = session.createQuery(QRY_DELSELECTORS);
+-        q.setString(0, routeGroupId); // groupId
+-        q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); 
// instance
++        q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); 
// instance
++        q.setString(1, routeGroupId); // groupId
+         int updates = q.executeUpdate();
+         session.flush(); // explicit flush to ensure route removed
+         if (__log.isDebugEnabled())
+@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
+         return routes;
+     }
+ 
++    public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean 
isCorrleationKeySetPreInitialized) {
++        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
++
++        entering("CorrelatorDaoImpl.findRoute");
++        __log.debug("findRoute(keySet={})",keySet);
++
++        //Query q = 
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? 
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
++        Query q = null;
++        if(isCorrleationKeySetPreInitialized){
++            q = getSession().createQuery(FLTR_SELECTORS + " and 
hs.correlationKey = :s0");
++            q.setEntity("correlator", getHibernateObj());
++            q.setString("s0", keySet.toCanonicalString());
++        } else {
++            List<CorrelationKeySet> subSets = keySet.findSubSets();
++            q = 
getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
++            q.setEntity("correlator", getHibernateObj());
++
++            for( int i = 0; i < subSets.size(); i++ ) {
++                q.setString("s" + i, subSets.get(i).toCanonicalString());
++            }
++        }
++        // Make sure we obtain a lock for the selector we want to find.
++        q.setLockMode("hs", LockMode.UPGRADE);
++
++        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
++        List<HCorrelatorSelector> list;
++        try {
++            list = (List<HCorrelatorSelector>) q.list();
++        } catch (LockAcquisitionException e) {
++            throw new Scheduler.JobProcessorException(e, true);
++        }
++        for (HCorrelatorSelector selector : list) {
++            __log.debug("selector returned form findRoute {} and targets {}", 
selector.getInstance().getId(),targets);
++            if (selector != null) {
++                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
++                if ("all".equals(selector.getRoute()) ||
++                        (isRoutePolicyOne && 
!targets.contains(selector.getInstance()))) {
++                    __log.debug("selector added for targets {}", 
selector.getInstance().getId());
++                    routes.add(new MessageRouteDaoImpl(_sm, selector));
++                    targets.add(selector.getInstance());
++                }
++            }
++        }
++
++        __log.debug("findRoute(keySet={}) found {}",keySet,routes);
++
++        return routes;
++    }
+ }
+diff --git 
a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java 
b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+index 9f6fbce79..514fda8c1 100644
+--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
++++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO 
implements CorrelatorDAO {
+ 
+     @SuppressWarnings("unchecked")
+     public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet) {
+-        if (__log.isDebugEnabled()) {
+-            __log.debug("findRoute " + correlationKeySet);
+-        }
+-        List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+-        Query qry = 
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+-        qry.setParameter("corr", this);
+-        for (int i = 0; i < subSets.size(); i++) {
+-            qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+-        }
+-
+-        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) 
qry.getResultList();
+-        if (candidateRoutes.size() > 0) {
+-            List<MessageRouteDAO> matchingRoutes = new 
ArrayList<MessageRouteDAO>();
+-            boolean routed = false;
+-            for (int i = 0; i < candidateRoutes.size(); i++) {
+-                MessageRouteDAO route = candidateRoutes.get(i);
+-                if ("all".equals(route.getRoute())) {
+-                    matchingRoutes.add(route);
+-                } else {
+-                    if (!routed) {
+-                        matchingRoutes.add(route);
+-                    }
+-                    routed = true;
+-                }
+-            }
+-            if (__log.isDebugEnabled()) {
+-                __log.debug("findRoute found " + matchingRoutes);
+-            }
+-            return matchingRoutes;
+-        } else {
+-            if (__log.isDebugEnabled()) {
+-                __log.debug("findRoute found nothing");
+-            }
+-            return null;
+-        }
++        return findRoute(correlationKeySet, false);
+     }
+ 
+     private String generateSelectorQuery(String header, 
List<CorrelationKeySet> subSets) {
+@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO 
implements CorrelatorDAO {
+         // TODO Auto-generated method stub
+         return true;
+     }
++
++    public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
++        __log.debug("findRoute {}", correlationKeySet);
++
++        Query qry = null;
++
++        if(isCorrleationKeySetPreInitialized){
++            qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and 
route._correlationKey = :s0");
++            qry.setParameter("corr", this);
++            qry.setParameter("s0", correlationKeySet.toCanonicalString());
++        } else {
++            List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
++            qry = 
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
++            qry.setParameter("corr", this);
++            for (int i = 0; i < subSets.size(); i++) {
++                qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
++            }
++        }
++
++        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) 
qry.getResultList();
++        if (candidateRoutes.size() > 0) {
++            List<MessageRouteDAO> matchingRoutes = new 
ArrayList<MessageRouteDAO>();
++            boolean routed = false;
++            for (int i = 0; i < candidateRoutes.size(); i++) {
++                MessageRouteDAO route = candidateRoutes.get(i);
++                if ("all".equals(route.getRoute())) {
++                    matchingRoutes.add(route);
++                } else {
++                    if (!routed) {
++                        matchingRoutes.add(route);
++                    }
++                    routed = true;
++                }
++            }
++
++            __log.debug("findRoute found {}",matchingRoutes);
++            return matchingRoutes;
++        } else {
++            __log.debug("findRoute found nothing");
++            return null;
++        }
++    }
+ }

http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java 
b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
index 2925d96..023a11d 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
@@ -83,9 +83,11 @@ public interface CorrelatorDAO {
     Collection<CorrelatorMessageDAO> getAllMessages();
 
   /**
+   * @deprecated
    * Find a route matching the given correlation key.
    * @param correlationKey correlation key
    * @return route matching the given correlation key
+   * @see findRoute(CorrelationKeySet correlationKeySet,boolean 
isCorrleationKeySetPreInitialized)
    */
   List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet);
 
@@ -115,4 +117,17 @@ public interface CorrelatorDAO {
      * @return all routes registered on this correlator, use with care as it 
can potentially return a lot of values
      */
     Collection<MessageRouteDAO> getAllRoutes();
+
+    /**
+     * Find a route matching the given correlation key set.
+     * If the correlationKeySet is known to be pre initialized then 
isCorrleationKeySetPreInitialized can be set to true or false otherwise.
+     * Depending on the value of isCorrleationKeySetPreInitialized,
+     *  true -  canonical value of the correlationKeySet might be used to find 
the route.
+     *  false - canonical value of each of the subset of correlationKeySet 
might be used to find the route.
+     *
+     * @param correlationKeySet correlation key
+     * @param isCorrleationKeySetPreInitialized
+     * @return route matching the given correlation key
+     */
+    List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized);
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
index d2808df..f4c9fec 100644
--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -1429,7 +1429,7 @@ public class BpelRuntimeContextImpl implements 
BpelRuntimeContext {
 
         // Find the route first, this is a SELECT FOR UPDATE on the "selector" 
row,
         // So we want to acquire the lock before we do anthing else.
-        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet);
+        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true);
         if (mroutes == null || mroutes.size() == 0) {
             // Ok, this means that a message arrived before we did, so nothing 
to do.
             __log.debug("MatcherEvent handling: nothing to do, route no longer 
in DB");
@@ -1445,6 +1445,7 @@ public class BpelRuntimeContextImpl implements 
BpelRuntimeContext {
             }
 
             for (MessageRouteDAO mroute : mroutes) {
+                __log.debug("Removing routes for GroupID: {} Instance: 
{}",mroute.getGroupId(),_dao.getInstanceId());
                 // We have a match, so we can get rid of the routing entries.
                 correlator.removeRoutes(mroute.getGroupId(), _dao);
             }

http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
index c090b53..f6261d4 100644
--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
+++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements 
CorrelatorDAO {
     }
 
     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
-        assert keySet != null;
-
-        if (__log.isDebugEnabled()) {
-            __log.debug("findRoute: keySet=" + keySet);
-        }
-        boolean routed = false;
-        for (MessageRouteDaoImpl route : _routes) {
-            assert route._ckeySet != null;
-
-            if(keySet.isRoutableTo(route._ckeySet, 
"all".equals(route.getRoute()))) {
-                if ("all".equals(route.getRoute()))  {
-                    routes.add(route);
-                } else {
-                    if (!routed) {
-                        routes.add(route);
-                    }
-                    routed = true;
-                }
-            }
-        }
-
-        return routes;
+        return findRoute(keySet, false);
     }
 
     public String getCorrelatorId() {
@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements 
CorrelatorDAO {
         return true;
     }
 
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        assert correlationKeySet != null;
+        boolean routed = false;
+
+        __log.debug("findRoute: keySet={}",correlationKeySet);
+
+        for (MessageRouteDaoImpl route : _routes) {
+            assert route._ckeySet != null;
+
+            if(correlationKeySet.isRoutableTo(route._ckeySet, 
"all".equals(route.getRoute()))) {
+                if ("all".equals(route.getRoute()))  {
+                    routes.add(route);
+                } else {
+                    if (!routed) {
+                        routes.add(route);
+                    }
+                    routed = true;
+                }
+            }
+        }
+
+        return routes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index 3f530cf..a467c2d 100644
--- 
a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ 
b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
             "(select hc.id from HCorrelator as hc where hc.correlatorId = 
:correlatorId)").intern();
 
     /** Query for removing routes. */
-    private static final String QRY_DELSELECTORS = "delete from 
HCorrelatorSelector where groupId = ? and instance = ?";
+    private static final String QRY_DELSELECTORS = "delete from 
HCorrelatorSelector where instance = ? and groupId = ?";
 
     private HCorrelator _hobj;
 
@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
 
         __log.debug("dequeueMessage({}): ",keySet);
 
-        List<CorrelationKeySet> subSets = keySet.findSubSets();
-        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
-                generateUnmatchedQuery(subSets));
-        for( int i = 0; i < subSets.size(); i++ ) {
-            qry.setString("s" + i, subSets.get(i).toCanonicalString());
-        }
+        Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," 
where this.correlationKey = :s0");
+        qry.setString("s0", keySet.toCanonicalString());
 
         // We really should consider the possibility of multiple messages 
matching a criteria.
         // When the message is handled, its not too convenient to attempt to 
determine if the
@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
 
     @SuppressWarnings("unchecked")
     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
-        entering("CorrelatorDaoImpl.findRoute");
-        String hdr = "findRoute(keySet=" + keySet + "): ";
-        if (__log.isDebugEnabled()) __log.debug(hdr);
-
-        //String processType = new 
QName(_hobj.getProcess().getTypeNamespace(), 
_hobj.getProcess().getTypeName()).toString();
-        List<CorrelationKeySet> subSets = keySet.findSubSets();
-
-        //Query q = 
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? 
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
-        Query q = 
getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
-        q.setEntity("correlator", getHibernateObj());
-
-        for( int i = 0; i < subSets.size(); i++ ) {
-            q.setString("s" + i, subSets.get(i).toCanonicalString());
-        }
-        // Make sure we obtain a lock for the selector we want to find.
-        q.setLockMode("hs", LockMode.UPGRADE);
-
-        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
-        List<HCorrelatorSelector> list;
-        try {
-            list = (List<HCorrelatorSelector>) q.list();
-        } catch (LockAcquisitionException e) {
-            throw new Scheduler.JobProcessorException(e, true);
-        }
-        for (HCorrelatorSelector selector : list) {
-            if (selector != null) {
-                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
-                if ("all".equals(selector.getRoute()) ||
-                        (isRoutePolicyOne && 
!targets.contains(selector.getInstance()))) {
-                    routes.add(new MessageRouteDaoImpl(_sm, selector));
-                    targets.add(selector.getInstance());
-                }
-            }
-        }
-
-        if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
-
-        return routes;
+        return findRoute(keySet,false);
     }
 
     private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
         __log.debug(hdr);
         Session session = getSession();
         Query q = session.createQuery(QRY_DELSELECTORS);
-        q.setString(0, routeGroupId); // groupId
-        q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); 
// instance
+        q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); 
// instance
+        q.setString(1, routeGroupId); // groupId
         int updates = q.executeUpdate();
         session.flush(); // explicit flush to ensure route removed
         if (__log.isDebugEnabled())
@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements 
CorrelatorDAO {
         return routes;
     }
 
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean 
isCorrleationKeySetPreInitialized) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        entering("CorrelatorDaoImpl.findRoute");
+        __log.debug("findRoute(keySet={})",keySet);
+
+        //Query q = 
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? 
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+        Query q = null;
+        if(isCorrleationKeySetPreInitialized){
+            q = getSession().createQuery(FLTR_SELECTORS + " and 
hs.correlationKey = :s0");
+            q.setEntity("correlator", getHibernateObj());
+            q.setString("s0", keySet.toCanonicalString());
+        } else {
+            List<CorrelationKeySet> subSets = keySet.findSubSets();
+            q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, 
subSets));
+            q.setEntity("correlator", getHibernateObj());
+
+            for( int i = 0; i < subSets.size(); i++ ) {
+                q.setString("s" + i, subSets.get(i).toCanonicalString());
+            }
+        }
+        // Make sure we obtain a lock for the selector we want to find.
+        q.setLockMode("hs", LockMode.UPGRADE);
+
+        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+        List<HCorrelatorSelector> list;
+        try {
+            list = (List<HCorrelatorSelector>) q.list();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        for (HCorrelatorSelector selector : list) {
+            __log.debug("selector returned form findRoute {} and targets {}", 
selector.getInstance().getId(),targets);
+            if (selector != null) {
+                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
+                if ("all".equals(selector.getRoute()) ||
+                        (isRoutePolicyOne && 
!targets.contains(selector.getInstance()))) {
+                    __log.debug("selector added for targets {}", 
selector.getInstance().getId());
+                    routes.add(new MessageRouteDaoImpl(_sm, selector));
+                    targets.add(selector.getInstance());
+                }
+            }
+        }
+
+        __log.debug("findRoute(keySet={}) found {}",keySet,routes);
+
+        return routes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java 
b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
index 9f6fbce..514fda8 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO 
implements CorrelatorDAO {
 
     @SuppressWarnings("unchecked")
     public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet) {
-        if (__log.isDebugEnabled()) {
-            __log.debug("findRoute " + correlationKeySet);
-        }
-        List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
-        Query qry = 
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
-        qry.setParameter("corr", this);
-        for (int i = 0; i < subSets.size(); i++) {
-            qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
-        }
-
-        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) 
qry.getResultList();
-        if (candidateRoutes.size() > 0) {
-            List<MessageRouteDAO> matchingRoutes = new 
ArrayList<MessageRouteDAO>();
-            boolean routed = false;
-            for (int i = 0; i < candidateRoutes.size(); i++) {
-                MessageRouteDAO route = candidateRoutes.get(i);
-                if ("all".equals(route.getRoute())) {
-                    matchingRoutes.add(route);
-                } else {
-                    if (!routed) {
-                        matchingRoutes.add(route);
-                    }
-                    routed = true;
-                }
-            }
-            if (__log.isDebugEnabled()) {
-                __log.debug("findRoute found " + matchingRoutes);
-            }
-            return matchingRoutes;
-        } else {
-            if (__log.isDebugEnabled()) {
-                __log.debug("findRoute found nothing");
-            }
-            return null;
-        }
+        return findRoute(correlationKeySet, false);
     }
 
     private String generateSelectorQuery(String header, 
List<CorrelationKeySet> subSets) {
@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO 
implements CorrelatorDAO {
         // TODO Auto-generated method stub
         return true;
     }
+
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet 
correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+        __log.debug("findRoute {}", correlationKeySet);
+
+        Query qry = null;
+
+        if(isCorrleationKeySetPreInitialized){
+            qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and 
route._correlationKey = :s0");
+            qry.setParameter("corr", this);
+            qry.setParameter("s0", correlationKeySet.toCanonicalString());
+        } else {
+            List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+            qry = 
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+            qry.setParameter("corr", this);
+            for (int i = 0; i < subSets.size(); i++) {
+                qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+            }
+        }
+
+        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) 
qry.getResultList();
+        if (candidateRoutes.size() > 0) {
+            List<MessageRouteDAO> matchingRoutes = new 
ArrayList<MessageRouteDAO>();
+            boolean routed = false;
+            for (int i = 0; i < candidateRoutes.size(); i++) {
+                MessageRouteDAO route = candidateRoutes.get(i);
+                if ("all".equals(route.getRoute())) {
+                    matchingRoutes.add(route);
+                } else {
+                    if (!routed) {
+                        matchingRoutes.add(route);
+                    }
+                    routed = true;
+                }
+            }
+
+            __log.debug("findRoute found {}",matchingRoutes);
+            return matchingRoutes;
+        } else {
+            __log.debug("findRoute found nothing");
+            return null;
+        }
+    }
 }

Reply via email to