Repository: ode Updated Branches: refs/heads/master b85e5f93e -> 592e617cd
ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value. Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/d4c3aa74 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/d4c3aa74 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/d4c3aa74 Branch: refs/heads/master Commit: d4c3aa74016936f3b829d9ca6fdd872b261b76eb Parents: 24d7f78 Author: sathwik <[email protected]> Authored: Wed Jun 28 15:05:28 2017 +0530 Committer: sathwik <[email protected]> Committed: Wed Jun 28 15:05:28 2017 +0530 ---------------------------------------------------------------------- ODE-1049.patch | 379 +++++++++++++++++++ .../org/apache/ode/bpel/dao/CorrelatorDAO.java | 15 + .../ode/bpel/engine/BpelRuntimeContextImpl.java | 3 +- .../ode/bpel/memdao/CorrelatorDaoImpl.java | 50 +-- .../ode/daohib/bpel/CorrelatorDaoImpl.java | 103 ++--- .../apache/ode/dao/jpa/CorrelatorDAOImpl.java | 78 ++-- 6 files changed, 519 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/ODE-1049.patch ---------------------------------------------------------------------- diff --git a/ODE-1049.patch b/ODE-1049.patch new file mode 100644 index 0000000..7f790ae --- /dev/null +++ b/ODE-1049.patch @@ -0,0 +1,379 @@ +commit 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93 +Author: sathwik <[email protected]> +Date: Sat Mar 5 12:12:04 2016 +0530 + + ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value. + +diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java +index 7a2556c0d..21cd8eb87 100644 +--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java ++++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java +@@ -83,9 +83,11 @@ public interface CorrelatorDAO { + Collection<CorrelatorMessageDAO> getAllMessages(); + + /** ++ * @deprecated + * Find a route matching the given correlation key. + * @param correlationKey correlation key + * @return route matching the given correlation key ++ * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) + */ + List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet); + +@@ -115,4 +117,17 @@ public interface CorrelatorDAO { + * @return all routes registered on this correlator, use with care as it can potentially return a lot of values + */ + Collection<MessageRouteDAO> getAllRoutes(); ++ ++ /** ++ * Find a route matching the given correlation key set. ++ * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise. ++ * Depending on the value of isCorrleationKeySetPreInitialized, ++ * true - canonical value of the correlationKeySet might be used to find the route. ++ * false - canonical value of each of the subset of correlationKeySet might be used to find the route. ++ * ++ * @param correlationKeySet correlation key ++ * @param isCorrleationKeySetPreInitialized ++ * @return route matching the given correlation key ++ */ ++ List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized); + } +diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java +index aff7733f4..480094e13 100644 +--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java ++++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java +@@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { + if (BpelProcess.__log.isDebugEnabled()) { + __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet); + } ++ + CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId); + ++ + // Find the route first, this is a SELECT FOR UPDATE on the "selector" row, + // So we want to acquire the lock before we do anthing else. +- List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet); ++ List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true); + if (mroutes == null || mroutes.size() == 0) { + // Ok, this means that a message arrived before we did, so nothing to do. + __log.debug("MatcherEvent handling: nothing to do, route no longer in DB"); +@@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { + + // Now see if there is a message that matches this selector. + MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet); ++ + if (mexdao != null) { + __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)"); +- if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { +- __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); +- } +- ++ if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { ++ __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); ++ } ++ + for (MessageRouteDAO mroute : mroutes) { ++ __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId()); + // We have a match, so we can get rid of the routing entries. + correlator.removeRoutes(mroute.getGroupId(), _dao); + } +diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java +index 5de76f2f8..f6261d47f 100644 +--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java ++++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java +@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { + } + + public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { +- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); +- +- assert keySet != null; +- +- if (__log.isDebugEnabled()) { +- __log.debug("findRoute: keySet=" + keySet); +- } +- boolean routed = false; +- for (MessageRouteDaoImpl route : _routes) { +- assert route._ckeySet != null; +- +- if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { +- if ("all".equals(route.getRoute())) { +- routes.add(route); +- } else { +- if (!routed) { +- routes.add(route); +- } +- routed = true; +- } +- } +- } +- +- return routes; ++ return findRoute(keySet, false); + } + + public String getCorrelatorId() { +@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { + return true; + } + ++ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { ++ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); ++ ++ assert correlationKeySet != null; ++ boolean routed = false; ++ ++ __log.debug("findRoute: keySet={}",correlationKeySet); ++ ++ for (MessageRouteDaoImpl route : _routes) { ++ assert route._ckeySet != null; ++ ++ if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { ++ if ("all".equals(route.getRoute())) { ++ routes.add(route); ++ } else { ++ if (!routed) { ++ routes.add(route); ++ } ++ routed = true; ++ } ++ } ++ } ++ ++ return routes; ++ } + } +diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +index 3f530cf4a..a467c2db1 100644 +--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java ++++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { + "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern(); + + /** Query for removing routes. */ +- private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?"; ++ private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?"; + + private HCorrelator _hobj; + +@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { + + __log.debug("dequeueMessage({}): ",keySet); + +- List<CorrelationKeySet> subSets = keySet.findSubSets(); +- Query qry = getSession().createFilter(_hobj.getMessageCorrelations(), +- generateUnmatchedQuery(subSets)); +- for( int i = 0; i < subSets.size(); i++ ) { +- qry.setString("s" + i, subSets.get(i).toCanonicalString()); +- } ++ Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0"); ++ qry.setString("s0", keySet.toCanonicalString()); + + // We really should consider the possibility of multiple messages matching a criteria. + // When the message is handled, its not too convenient to attempt to determine if the +@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { + + @SuppressWarnings("unchecked") + public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { +- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); +- +- entering("CorrelatorDaoImpl.findRoute"); +- String hdr = "findRoute(keySet=" + keySet + "): "; +- if (__log.isDebugEnabled()) __log.debug(hdr); +- +- //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); +- List<CorrelationKeySet> subSets = keySet.findSubSets(); +- +- //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); +- Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); +- q.setEntity("correlator", getHibernateObj()); +- +- for( int i = 0; i < subSets.size(); i++ ) { +- q.setString("s" + i, subSets.get(i).toCanonicalString()); +- } +- // Make sure we obtain a lock for the selector we want to find. +- q.setLockMode("hs", LockMode.UPGRADE); +- +- List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); +- List<HCorrelatorSelector> list; +- try { +- list = (List<HCorrelatorSelector>) q.list(); +- } catch (LockAcquisitionException e) { +- throw new Scheduler.JobProcessorException(e, true); +- } +- for (HCorrelatorSelector selector : list) { +- if (selector != null) { +- boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); +- if ("all".equals(selector.getRoute()) || +- (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { +- routes.add(new MessageRouteDaoImpl(_sm, selector)); +- targets.add(selector.getInstance()); +- } +- } +- } +- +- if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes); +- +- return routes; ++ return findRoute(keySet,false); + } + + private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) { +@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { + __log.debug(hdr); + Session session = getSession(); + Query q = session.createQuery(QRY_DELSELECTORS); +- q.setString(0, routeGroupId); // groupId +- q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance ++ q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance ++ q.setString(1, routeGroupId); // groupId + int updates = q.executeUpdate(); + session.flush(); // explicit flush to ensure route removed + if (__log.isDebugEnabled()) +@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { + return routes; + } + ++ public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) { ++ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); ++ ++ entering("CorrelatorDaoImpl.findRoute"); ++ __log.debug("findRoute(keySet={})",keySet); ++ ++ //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); ++ Query q = null; ++ if(isCorrleationKeySetPreInitialized){ ++ q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0"); ++ q.setEntity("correlator", getHibernateObj()); ++ q.setString("s0", keySet.toCanonicalString()); ++ } else { ++ List<CorrelationKeySet> subSets = keySet.findSubSets(); ++ q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); ++ q.setEntity("correlator", getHibernateObj()); ++ ++ for( int i = 0; i < subSets.size(); i++ ) { ++ q.setString("s" + i, subSets.get(i).toCanonicalString()); ++ } ++ } ++ // Make sure we obtain a lock for the selector we want to find. ++ q.setLockMode("hs", LockMode.UPGRADE); ++ ++ List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); ++ List<HCorrelatorSelector> list; ++ try { ++ list = (List<HCorrelatorSelector>) q.list(); ++ } catch (LockAcquisitionException e) { ++ throw new Scheduler.JobProcessorException(e, true); ++ } ++ for (HCorrelatorSelector selector : list) { ++ __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets); ++ if (selector != null) { ++ boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); ++ if ("all".equals(selector.getRoute()) || ++ (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { ++ __log.debug("selector added for targets {}", selector.getInstance().getId()); ++ routes.add(new MessageRouteDaoImpl(_sm, selector)); ++ targets.add(selector.getInstance()); ++ } ++ } ++ } ++ ++ __log.debug("findRoute(keySet={}) found {}",keySet,routes); ++ ++ return routes; ++ } + } +diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java +index 9f6fbce79..514fda8c1 100644 +--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java ++++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java +@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { + + @SuppressWarnings("unchecked") + public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) { +- if (__log.isDebugEnabled()) { +- __log.debug("findRoute " + correlationKeySet); +- } +- List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); +- Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); +- qry.setParameter("corr", this); +- for (int i = 0; i < subSets.size(); i++) { +- qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); +- } +- +- List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); +- if (candidateRoutes.size() > 0) { +- List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); +- boolean routed = false; +- for (int i = 0; i < candidateRoutes.size(); i++) { +- MessageRouteDAO route = candidateRoutes.get(i); +- if ("all".equals(route.getRoute())) { +- matchingRoutes.add(route); +- } else { +- if (!routed) { +- matchingRoutes.add(route); +- } +- routed = true; +- } +- } +- if (__log.isDebugEnabled()) { +- __log.debug("findRoute found " + matchingRoutes); +- } +- return matchingRoutes; +- } else { +- if (__log.isDebugEnabled()) { +- __log.debug("findRoute found nothing"); +- } +- return null; +- } ++ return findRoute(correlationKeySet, false); + } + + private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) { +@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { + // TODO Auto-generated method stub + return true; + } ++ ++ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { ++ __log.debug("findRoute {}", correlationKeySet); ++ ++ Query qry = null; ++ ++ if(isCorrleationKeySetPreInitialized){ ++ qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0"); ++ qry.setParameter("corr", this); ++ qry.setParameter("s0", correlationKeySet.toCanonicalString()); ++ } else { ++ List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); ++ qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); ++ qry.setParameter("corr", this); ++ for (int i = 0; i < subSets.size(); i++) { ++ qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); ++ } ++ } ++ ++ List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); ++ if (candidateRoutes.size() > 0) { ++ List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); ++ boolean routed = false; ++ for (int i = 0; i < candidateRoutes.size(); i++) { ++ MessageRouteDAO route = candidateRoutes.get(i); ++ if ("all".equals(route.getRoute())) { ++ matchingRoutes.add(route); ++ } else { ++ if (!routed) { ++ matchingRoutes.add(route); ++ } ++ routed = true; ++ } ++ } ++ ++ __log.debug("findRoute found {}",matchingRoutes); ++ return matchingRoutes; ++ } else { ++ __log.debug("findRoute found nothing"); ++ return null; ++ } ++ } + } http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java index 2925d96..023a11d 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java @@ -83,9 +83,11 @@ public interface CorrelatorDAO { Collection<CorrelatorMessageDAO> getAllMessages(); /** + * @deprecated * Find a route matching the given correlation key. * @param correlationKey correlation key * @return route matching the given correlation key + * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) */ List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet); @@ -115,4 +117,17 @@ public interface CorrelatorDAO { * @return all routes registered on this correlator, use with care as it can potentially return a lot of values */ Collection<MessageRouteDAO> getAllRoutes(); + + /** + * Find a route matching the given correlation key set. + * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise. + * Depending on the value of isCorrleationKeySetPreInitialized, + * true - canonical value of the correlationKeySet might be used to find the route. + * false - canonical value of each of the subset of correlationKeySet might be used to find the route. + * + * @param correlationKeySet correlation key + * @param isCorrleationKeySetPreInitialized + * @return route matching the given correlation key + */ + List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized); } http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java index d2808df..f4c9fec 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java @@ -1429,7 +1429,7 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { // Find the route first, this is a SELECT FOR UPDATE on the "selector" row, // So we want to acquire the lock before we do anthing else. - List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet); + List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true); if (mroutes == null || mroutes.size() == 0) { // Ok, this means that a message arrived before we did, so nothing to do. __log.debug("MatcherEvent handling: nothing to do, route no longer in DB"); @@ -1445,6 +1445,7 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { } for (MessageRouteDAO mroute : mroutes) { + __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId()); // We have a match, so we can get rid of the routing entries. correlator.removeRoutes(mroute.getGroupId(), _dao); } http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java index c090b53..f6261d4 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java @@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { } public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { - List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); - - assert keySet != null; - - if (__log.isDebugEnabled()) { - __log.debug("findRoute: keySet=" + keySet); - } - boolean routed = false; - for (MessageRouteDaoImpl route : _routes) { - assert route._ckeySet != null; - - if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { - if ("all".equals(route.getRoute())) { - routes.add(route); - } else { - if (!routed) { - routes.add(route); - } - routed = true; - } - } - } - - return routes; + return findRoute(keySet, false); } public String getCorrelatorId() { @@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { return true; } + public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { + List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); + + assert correlationKeySet != null; + boolean routed = false; + + __log.debug("findRoute: keySet={}",correlationKeySet); + + for (MessageRouteDaoImpl route : _routes) { + assert route._ckeySet != null; + + if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { + if ("all".equals(route.getRoute())) { + routes.add(route); + } else { + if (!routed) { + routes.add(route); + } + routed = true; + } + } + } + + return routes; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java index 3f530cf..a467c2d 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java @@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern(); /** Query for removing routes. */ - private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?"; + private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?"; private HCorrelator _hobj; @@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { __log.debug("dequeueMessage({}): ",keySet); - List<CorrelationKeySet> subSets = keySet.findSubSets(); - Query qry = getSession().createFilter(_hobj.getMessageCorrelations(), - generateUnmatchedQuery(subSets)); - for( int i = 0; i < subSets.size(); i++ ) { - qry.setString("s" + i, subSets.get(i).toCanonicalString()); - } + Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0"); + qry.setString("s0", keySet.toCanonicalString()); // We really should consider the possibility of multiple messages matching a criteria. // When the message is handled, its not too convenient to attempt to determine if the @@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { @SuppressWarnings("unchecked") public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { - List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); - - entering("CorrelatorDaoImpl.findRoute"); - String hdr = "findRoute(keySet=" + keySet + "): "; - if (__log.isDebugEnabled()) __log.debug(hdr); - - //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); - List<CorrelationKeySet> subSets = keySet.findSubSets(); - - //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); - Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); - q.setEntity("correlator", getHibernateObj()); - - for( int i = 0; i < subSets.size(); i++ ) { - q.setString("s" + i, subSets.get(i).toCanonicalString()); - } - // Make sure we obtain a lock for the selector we want to find. - q.setLockMode("hs", LockMode.UPGRADE); - - List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); - List<HCorrelatorSelector> list; - try { - list = (List<HCorrelatorSelector>) q.list(); - } catch (LockAcquisitionException e) { - throw new Scheduler.JobProcessorException(e, true); - } - for (HCorrelatorSelector selector : list) { - if (selector != null) { - boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); - if ("all".equals(selector.getRoute()) || - (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { - routes.add(new MessageRouteDaoImpl(_sm, selector)); - targets.add(selector.getInstance()); - } - } - } - - if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes); - - return routes; + return findRoute(keySet,false); } private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) { @@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { __log.debug(hdr); Session session = getSession(); Query q = session.createQuery(QRY_DELSELECTORS); - q.setString(0, routeGroupId); // groupId - q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance + q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance + q.setString(1, routeGroupId); // groupId int updates = q.executeUpdate(); session.flush(); // explicit flush to ensure route removed if (__log.isDebugEnabled()) @@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { return routes; } + public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) { + List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); + + entering("CorrelatorDaoImpl.findRoute"); + __log.debug("findRoute(keySet={})",keySet); + + //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); + Query q = null; + if(isCorrleationKeySetPreInitialized){ + q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0"); + q.setEntity("correlator", getHibernateObj()); + q.setString("s0", keySet.toCanonicalString()); + } else { + List<CorrelationKeySet> subSets = keySet.findSubSets(); + q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); + q.setEntity("correlator", getHibernateObj()); + + for( int i = 0; i < subSets.size(); i++ ) { + q.setString("s" + i, subSets.get(i).toCanonicalString()); + } + } + // Make sure we obtain a lock for the selector we want to find. + q.setLockMode("hs", LockMode.UPGRADE); + + List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); + List<HCorrelatorSelector> list; + try { + list = (List<HCorrelatorSelector>) q.list(); + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } + for (HCorrelatorSelector selector : list) { + __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets); + if (selector != null) { + boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); + if ("all".equals(selector.getRoute()) || + (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { + __log.debug("selector added for targets {}", selector.getInstance().getId()); + routes.add(new MessageRouteDaoImpl(_sm, selector)); + targets.add(selector.getInstance()); + } + } + } + + __log.debug("findRoute(keySet={}) found {}",keySet,routes); + + return routes; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/d4c3aa74/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java index 9f6fbce..514fda8 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java @@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { @SuppressWarnings("unchecked") public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) { - if (__log.isDebugEnabled()) { - __log.debug("findRoute " + correlationKeySet); - } - List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); - Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); - qry.setParameter("corr", this); - for (int i = 0; i < subSets.size(); i++) { - qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); - } - - List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); - if (candidateRoutes.size() > 0) { - List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); - boolean routed = false; - for (int i = 0; i < candidateRoutes.size(); i++) { - MessageRouteDAO route = candidateRoutes.get(i); - if ("all".equals(route.getRoute())) { - matchingRoutes.add(route); - } else { - if (!routed) { - matchingRoutes.add(route); - } - routed = true; - } - } - if (__log.isDebugEnabled()) { - __log.debug("findRoute found " + matchingRoutes); - } - return matchingRoutes; - } else { - if (__log.isDebugEnabled()) { - __log.debug("findRoute found nothing"); - } - return null; - } + return findRoute(correlationKeySet, false); } private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) { @@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { // TODO Auto-generated method stub return true; } + + public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { + __log.debug("findRoute {}", correlationKeySet); + + Query qry = null; + + if(isCorrleationKeySetPreInitialized){ + qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0"); + qry.setParameter("corr", this); + qry.setParameter("s0", correlationKeySet.toCanonicalString()); + } else { + List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); + qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); + qry.setParameter("corr", this); + for (int i = 0; i < subSets.size(); i++) { + qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); + } + } + + List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); + if (candidateRoutes.size() > 0) { + List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); + boolean routed = false; + for (int i = 0; i < candidateRoutes.size(); i++) { + MessageRouteDAO route = candidateRoutes.get(i); + if ("all".equals(route.getRoute())) { + matchingRoutes.add(route); + } else { + if (!routed) { + matchingRoutes.add(route); + } + routed = true; + } + } + + __log.debug("findRoute found {}",matchingRoutes); + return matchingRoutes; + } else { + __log.debug("findRoute found nothing"); + return null; + } + } }
