Repository: ode Updated Branches: refs/heads/ode-1.3.x 0342b4031 -> 58b1a735d
ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value. Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/58b1a735 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/58b1a735 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/58b1a735 Branch: refs/heads/ode-1.3.x Commit: 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93 Parents: 0342b40 Author: sathwik <[email protected]> Authored: Sat Mar 5 12:12:04 2016 +0530 Committer: sathwik <[email protected]> Committed: Sat Mar 5 12:12:04 2016 +0530 ---------------------------------------------------------------------- .../org/apache/ode/bpel/dao/CorrelatorDAO.java | 15 +++ .../ode/bpel/engine/BpelRuntimeContextImpl.java | 14 ++- .../ode/bpel/memdao/CorrelatorDaoImpl.java | 50 ++++----- .../ode/daohib/bpel/CorrelatorDaoImpl.java | 103 ++++++++++--------- .../apache/ode/dao/jpa/CorrelatorDAOImpl.java | 78 +++++++------- 5 files changed, 147 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java index 7a2556c..21cd8eb 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java @@ -83,9 +83,11 @@ public interface CorrelatorDAO { Collection<CorrelatorMessageDAO> getAllMessages(); /** + * @deprecated * Find a route matching the given correlation key. * @param correlationKey correlation key * @return route matching the given correlation key + * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) */ List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet); @@ -115,4 +117,17 @@ public interface CorrelatorDAO { * @return all routes registered on this correlator, use with care as it can potentially return a lot of values */ Collection<MessageRouteDAO> getAllRoutes(); + + /** + * Find a route matching the given correlation key set. + * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise. + * Depending on the value of isCorrleationKeySetPreInitialized, + * true - canonical value of the correlationKeySet might be used to find the route. + * false - canonical value of each of the subset of correlationKeySet might be used to find the route. + * + * @param correlationKeySet correlation key + * @param isCorrleationKeySetPreInitialized + * @return route matching the given correlation key + */ + List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized); } http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java index aff7733..480094e 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java @@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { if (BpelProcess.__log.isDebugEnabled()) { __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet); } + CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId); + // Find the route first, this is a SELECT FOR UPDATE on the "selector" row, // So we want to acquire the lock before we do anthing else. - List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet); + List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true); if (mroutes == null || mroutes.size() == 0) { // Ok, this means that a message arrived before we did, so nothing to do. __log.debug("MatcherEvent handling: nothing to do, route no longer in DB"); @@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { // Now see if there is a message that matches this selector. MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet); + if (mexdao != null) { __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)"); - if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { - __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); - } - + if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { + __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); + } + for (MessageRouteDAO mroute : mroutes) { + __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId()); // We have a match, so we can get rid of the routing entries. correlator.removeRoutes(mroute.getGroupId(), _dao); } http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java index 5de76f2..f6261d4 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java @@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { } public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { - List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); - - assert keySet != null; - - if (__log.isDebugEnabled()) { - __log.debug("findRoute: keySet=" + keySet); - } - boolean routed = false; - for (MessageRouteDaoImpl route : _routes) { - assert route._ckeySet != null; - - if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { - if ("all".equals(route.getRoute())) { - routes.add(route); - } else { - if (!routed) { - routes.add(route); - } - routed = true; - } - } - } - - return routes; + return findRoute(keySet, false); } public String getCorrelatorId() { @@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { return true; } + public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { + List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); + + assert correlationKeySet != null; + boolean routed = false; + + __log.debug("findRoute: keySet={}",correlationKeySet); + + for (MessageRouteDaoImpl route : _routes) { + assert route._ckeySet != null; + + if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { + if ("all".equals(route.getRoute())) { + routes.add(route); + } else { + if (!routed) { + routes.add(route); + } + routed = true; + } + } + } + + return routes; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java index 3f530cf..a467c2d 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java @@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern(); /** Query for removing routes. */ - private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?"; + private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?"; private HCorrelator _hobj; @@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { __log.debug("dequeueMessage({}): ",keySet); - List<CorrelationKeySet> subSets = keySet.findSubSets(); - Query qry = getSession().createFilter(_hobj.getMessageCorrelations(), - generateUnmatchedQuery(subSets)); - for( int i = 0; i < subSets.size(); i++ ) { - qry.setString("s" + i, subSets.get(i).toCanonicalString()); - } + Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0"); + qry.setString("s0", keySet.toCanonicalString()); // We really should consider the possibility of multiple messages matching a criteria. // When the message is handled, its not too convenient to attempt to determine if the @@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { @SuppressWarnings("unchecked") public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { - List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); - - entering("CorrelatorDaoImpl.findRoute"); - String hdr = "findRoute(keySet=" + keySet + "): "; - if (__log.isDebugEnabled()) __log.debug(hdr); - - //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); - List<CorrelationKeySet> subSets = keySet.findSubSets(); - - //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); - Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); - q.setEntity("correlator", getHibernateObj()); - - for( int i = 0; i < subSets.size(); i++ ) { - q.setString("s" + i, subSets.get(i).toCanonicalString()); - } - // Make sure we obtain a lock for the selector we want to find. - q.setLockMode("hs", LockMode.UPGRADE); - - List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); - List<HCorrelatorSelector> list; - try { - list = (List<HCorrelatorSelector>) q.list(); - } catch (LockAcquisitionException e) { - throw new Scheduler.JobProcessorException(e, true); - } - for (HCorrelatorSelector selector : list) { - if (selector != null) { - boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); - if ("all".equals(selector.getRoute()) || - (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { - routes.add(new MessageRouteDaoImpl(_sm, selector)); - targets.add(selector.getInstance()); - } - } - } - - if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes); - - return routes; + return findRoute(keySet,false); } private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) { @@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { __log.debug(hdr); Session session = getSession(); Query q = session.createQuery(QRY_DELSELECTORS); - q.setString(0, routeGroupId); // groupId - q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance + q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance + q.setString(1, routeGroupId); // groupId int updates = q.executeUpdate(); session.flush(); // explicit flush to ensure route removed if (__log.isDebugEnabled()) @@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { return routes; } + public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) { + List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); + + entering("CorrelatorDaoImpl.findRoute"); + __log.debug("findRoute(keySet={})",keySet); + + //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); + Query q = null; + if(isCorrleationKeySetPreInitialized){ + q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0"); + q.setEntity("correlator", getHibernateObj()); + q.setString("s0", keySet.toCanonicalString()); + } else { + List<CorrelationKeySet> subSets = keySet.findSubSets(); + q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); + q.setEntity("correlator", getHibernateObj()); + + for( int i = 0; i < subSets.size(); i++ ) { + q.setString("s" + i, subSets.get(i).toCanonicalString()); + } + } + // Make sure we obtain a lock for the selector we want to find. + q.setLockMode("hs", LockMode.UPGRADE); + + List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); + List<HCorrelatorSelector> list; + try { + list = (List<HCorrelatorSelector>) q.list(); + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } + for (HCorrelatorSelector selector : list) { + __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets); + if (selector != null) { + boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); + if ("all".equals(selector.getRoute()) || + (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { + __log.debug("selector added for targets {}", selector.getInstance().getId()); + routes.add(new MessageRouteDaoImpl(_sm, selector)); + targets.add(selector.getInstance()); + } + } + } + + __log.debug("findRoute(keySet={}) found {}",keySet,routes); + + return routes; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java index 9f6fbce..514fda8 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java @@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { @SuppressWarnings("unchecked") public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) { - if (__log.isDebugEnabled()) { - __log.debug("findRoute " + correlationKeySet); - } - List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); - Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); - qry.setParameter("corr", this); - for (int i = 0; i < subSets.size(); i++) { - qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); - } - - List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); - if (candidateRoutes.size() > 0) { - List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); - boolean routed = false; - for (int i = 0; i < candidateRoutes.size(); i++) { - MessageRouteDAO route = candidateRoutes.get(i); - if ("all".equals(route.getRoute())) { - matchingRoutes.add(route); - } else { - if (!routed) { - matchingRoutes.add(route); - } - routed = true; - } - } - if (__log.isDebugEnabled()) { - __log.debug("findRoute found " + matchingRoutes); - } - return matchingRoutes; - } else { - if (__log.isDebugEnabled()) { - __log.debug("findRoute found nothing"); - } - return null; - } + return findRoute(correlationKeySet, false); } private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) { @@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { // TODO Auto-generated method stub return true; } + + public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { + __log.debug("findRoute {}", correlationKeySet); + + Query qry = null; + + if(isCorrleationKeySetPreInitialized){ + qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0"); + qry.setParameter("corr", this); + qry.setParameter("s0", correlationKeySet.toCanonicalString()); + } else { + List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); + qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); + qry.setParameter("corr", this); + for (int i = 0; i < subSets.size(); i++) { + qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); + } + } + + List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); + if (candidateRoutes.size() > 0) { + List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); + boolean routed = false; + for (int i = 0; i < candidateRoutes.size(); i++) { + MessageRouteDAO route = candidateRoutes.get(i); + if ("all".equals(route.getRoute())) { + matchingRoutes.add(route); + } else { + if (!routed) { + matchingRoutes.add(route); + } + routed = true; + } + } + + __log.debug("findRoute found {}",matchingRoutes); + return matchingRoutes; + } else { + __log.debug("findRoute found nothing"); + return null; + } + } }
