Removed ODE-1049.patch committed by mistake
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/80519edf Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/80519edf Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/80519edf Branch: refs/heads/master Commit: 80519edff75c8550e2f94a276813d2c0706e5758 Parents: 85cf15f Author: sathwik <[email protected]> Authored: Wed Jun 28 17:03:29 2017 +0530 Committer: sathwik <[email protected]> Committed: Wed Jun 28 17:03:29 2017 +0530 ---------------------------------------------------------------------- ODE-1049.patch | 379 ---------------------------------------------------- 1 file changed, 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/80519edf/ODE-1049.patch ---------------------------------------------------------------------- diff --git a/ODE-1049.patch b/ODE-1049.patch deleted file mode 100644 index 7f790ae..0000000 --- a/ODE-1049.patch +++ /dev/null @@ -1,379 +0,0 @@ -commit 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93 -Author: sathwik <[email protected]> -Date: Sat Mar 5 12:12:04 2016 +0530 - - ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value. - -diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java -index 7a2556c0d..21cd8eb87 100644 ---- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java -+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java -@@ -83,9 +83,11 @@ public interface CorrelatorDAO { - Collection<CorrelatorMessageDAO> getAllMessages(); - - /** -+ * @deprecated - * Find a route matching the given correlation key. - * @param correlationKey correlation key - * @return route matching the given correlation key -+ * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) - */ - List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet); - -@@ -115,4 +117,17 @@ public interface CorrelatorDAO { - * @return all routes registered on this correlator, use with care as it can potentially return a lot of values - */ - Collection<MessageRouteDAO> getAllRoutes(); -+ -+ /** -+ * Find a route matching the given correlation key set. -+ * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise. -+ * Depending on the value of isCorrleationKeySetPreInitialized, -+ * true - canonical value of the correlationKeySet might be used to find the route. -+ * false - canonical value of each of the subset of correlationKeySet might be used to find the route. -+ * -+ * @param correlationKeySet correlation key -+ * @param isCorrleationKeySetPreInitialized -+ * @return route matching the given correlation key -+ */ -+ List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized); - } -diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java -index aff7733f4..480094e13 100644 ---- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java -+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java -@@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { - if (BpelProcess.__log.isDebugEnabled()) { - __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet); - } -+ - CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId); - -+ - // Find the route first, this is a SELECT FOR UPDATE on the "selector" row, - // So we want to acquire the lock before we do anthing else. -- List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet); -+ List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true); - if (mroutes == null || mroutes.size() == 0) { - // Ok, this means that a message arrived before we did, so nothing to do. - __log.debug("MatcherEvent handling: nothing to do, route no longer in DB"); -@@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { - - // Now see if there is a message that matches this selector. - MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet); -+ - if (mexdao != null) { - __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)"); -- if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { -- __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); -- } -- -+ if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) { -+ __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load."); -+ } -+ - for (MessageRouteDAO mroute : mroutes) { -+ __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId()); - // We have a match, so we can get rid of the routing entries. - correlator.removeRoutes(mroute.getGroupId(), _dao); - } -diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java -index 5de76f2f8..f6261d47f 100644 ---- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java -+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java -@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { - } - - public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { -- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); -- -- assert keySet != null; -- -- if (__log.isDebugEnabled()) { -- __log.debug("findRoute: keySet=" + keySet); -- } -- boolean routed = false; -- for (MessageRouteDaoImpl route : _routes) { -- assert route._ckeySet != null; -- -- if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { -- if ("all".equals(route.getRoute())) { -- routes.add(route); -- } else { -- if (!routed) { -- routes.add(route); -- } -- routed = true; -- } -- } -- } -- -- return routes; -+ return findRoute(keySet, false); - } - - public String getCorrelatorId() { -@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO { - return true; - } - -+ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { -+ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); -+ -+ assert correlationKeySet != null; -+ boolean routed = false; -+ -+ __log.debug("findRoute: keySet={}",correlationKeySet); -+ -+ for (MessageRouteDaoImpl route : _routes) { -+ assert route._ckeySet != null; -+ -+ if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) { -+ if ("all".equals(route.getRoute())) { -+ routes.add(route); -+ } else { -+ if (!routed) { -+ routes.add(route); -+ } -+ routed = true; -+ } -+ } -+ } -+ -+ return routes; -+ } - } -diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java -index 3f530cf4a..a467c2db1 100644 ---- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java -+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java -@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { - "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern(); - - /** Query for removing routes. */ -- private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?"; -+ private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?"; - - private HCorrelator _hobj; - -@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { - - __log.debug("dequeueMessage({}): ",keySet); - -- List<CorrelationKeySet> subSets = keySet.findSubSets(); -- Query qry = getSession().createFilter(_hobj.getMessageCorrelations(), -- generateUnmatchedQuery(subSets)); -- for( int i = 0; i < subSets.size(); i++ ) { -- qry.setString("s" + i, subSets.get(i).toCanonicalString()); -- } -+ Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0"); -+ qry.setString("s0", keySet.toCanonicalString()); - - // We really should consider the possibility of multiple messages matching a criteria. - // When the message is handled, its not too convenient to attempt to determine if the -@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { - - @SuppressWarnings("unchecked") - public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) { -- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); -- -- entering("CorrelatorDaoImpl.findRoute"); -- String hdr = "findRoute(keySet=" + keySet + "): "; -- if (__log.isDebugEnabled()) __log.debug(hdr); -- -- //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); -- List<CorrelationKeySet> subSets = keySet.findSubSets(); -- -- //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); -- Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); -- q.setEntity("correlator", getHibernateObj()); -- -- for( int i = 0; i < subSets.size(); i++ ) { -- q.setString("s" + i, subSets.get(i).toCanonicalString()); -- } -- // Make sure we obtain a lock for the selector we want to find. -- q.setLockMode("hs", LockMode.UPGRADE); -- -- List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); -- List<HCorrelatorSelector> list; -- try { -- list = (List<HCorrelatorSelector>) q.list(); -- } catch (LockAcquisitionException e) { -- throw new Scheduler.JobProcessorException(e, true); -- } -- for (HCorrelatorSelector selector : list) { -- if (selector != null) { -- boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); -- if ("all".equals(selector.getRoute()) || -- (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { -- routes.add(new MessageRouteDaoImpl(_sm, selector)); -- targets.add(selector.getInstance()); -- } -- } -- } -- -- if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes); -- -- return routes; -+ return findRoute(keySet,false); - } - - private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) { -@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { - __log.debug(hdr); - Session session = getSession(); - Query q = session.createQuery(QRY_DELSELECTORS); -- q.setString(0, routeGroupId); // groupId -- q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance -+ q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance -+ q.setString(1, routeGroupId); // groupId - int updates = q.executeUpdate(); - session.flush(); // explicit flush to ensure route removed - if (__log.isDebugEnabled()) -@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { - return routes; - } - -+ public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) { -+ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); -+ -+ entering("CorrelatorDaoImpl.findRoute"); -+ __log.debug("findRoute(keySet={})",keySet); -+ -+ //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); -+ Query q = null; -+ if(isCorrleationKeySetPreInitialized){ -+ q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0"); -+ q.setEntity("correlator", getHibernateObj()); -+ q.setString("s0", keySet.toCanonicalString()); -+ } else { -+ List<CorrelationKeySet> subSets = keySet.findSubSets(); -+ q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); -+ q.setEntity("correlator", getHibernateObj()); -+ -+ for( int i = 0; i < subSets.size(); i++ ) { -+ q.setString("s" + i, subSets.get(i).toCanonicalString()); -+ } -+ } -+ // Make sure we obtain a lock for the selector we want to find. -+ q.setLockMode("hs", LockMode.UPGRADE); -+ -+ List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); -+ List<HCorrelatorSelector> list; -+ try { -+ list = (List<HCorrelatorSelector>) q.list(); -+ } catch (LockAcquisitionException e) { -+ throw new Scheduler.JobProcessorException(e, true); -+ } -+ for (HCorrelatorSelector selector : list) { -+ __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets); -+ if (selector != null) { -+ boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); -+ if ("all".equals(selector.getRoute()) || -+ (isRoutePolicyOne && !targets.contains(selector.getInstance()))) { -+ __log.debug("selector added for targets {}", selector.getInstance().getId()); -+ routes.add(new MessageRouteDaoImpl(_sm, selector)); -+ targets.add(selector.getInstance()); -+ } -+ } -+ } -+ -+ __log.debug("findRoute(keySet={}) found {}",keySet,routes); -+ -+ return routes; -+ } - } -diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java -index 9f6fbce79..514fda8c1 100644 ---- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java -+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java -@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { - - @SuppressWarnings("unchecked") - public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) { -- if (__log.isDebugEnabled()) { -- __log.debug("findRoute " + correlationKeySet); -- } -- List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); -- Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); -- qry.setParameter("corr", this); -- for (int i = 0; i < subSets.size(); i++) { -- qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); -- } -- -- List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); -- if (candidateRoutes.size() > 0) { -- List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); -- boolean routed = false; -- for (int i = 0; i < candidateRoutes.size(); i++) { -- MessageRouteDAO route = candidateRoutes.get(i); -- if ("all".equals(route.getRoute())) { -- matchingRoutes.add(route); -- } else { -- if (!routed) { -- matchingRoutes.add(route); -- } -- routed = true; -- } -- } -- if (__log.isDebugEnabled()) { -- __log.debug("findRoute found " + matchingRoutes); -- } -- return matchingRoutes; -- } else { -- if (__log.isDebugEnabled()) { -- __log.debug("findRoute found nothing"); -- } -- return null; -- } -+ return findRoute(correlationKeySet, false); - } - - private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) { -@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { - // TODO Auto-generated method stub - return true; - } -+ -+ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) { -+ __log.debug("findRoute {}", correlationKeySet); -+ -+ Query qry = null; -+ -+ if(isCorrleationKeySetPreInitialized){ -+ qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0"); -+ qry.setParameter("corr", this); -+ qry.setParameter("s0", correlationKeySet.toCanonicalString()); -+ } else { -+ List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); -+ qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); -+ qry.setParameter("corr", this); -+ for (int i = 0; i < subSets.size(); i++) { -+ qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); -+ } -+ } -+ -+ List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList(); -+ if (candidateRoutes.size() > 0) { -+ List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>(); -+ boolean routed = false; -+ for (int i = 0; i < candidateRoutes.size(); i++) { -+ MessageRouteDAO route = candidateRoutes.get(i); -+ if ("all".equals(route.getRoute())) { -+ matchingRoutes.add(route); -+ } else { -+ if (!routed) { -+ matchingRoutes.add(route); -+ } -+ routed = true; -+ } -+ } -+ -+ __log.debug("findRoute found {}",matchingRoutes); -+ return matchingRoutes; -+ } else { -+ __log.debug("findRoute found nothing"); -+ return null; -+ } -+ } - }
