Repository: ode Updated Branches: refs/heads/ode-1.3.x ba61210b4 -> 501f70f20
ODE-974: Routing logic for messages arriving early has been fixed Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/501f70f2 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/501f70f2 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/501f70f2 Branch: refs/heads/ode-1.3.x Commit: 501f70f200d02d82fc0607ccd5d7d8b76b000b6e Parents: ba61210 Author: sathwik <[email protected]> Authored: Wed Jan 20 16:56:36 2016 +0530 Committer: sathwik <[email protected]> Committed: Wed Jan 20 16:56:36 2016 +0530 ---------------------------------------------------------------------- .../org/apache/ode/bpel/dao/ProcessDAO.java | 10 ++ .../apache/ode/bpel/engine/BpelEngineImpl.java | 14 +- .../org/apache/ode/bpel/engine/BpelProcess.java | 141 ++++++++++++++----- .../ode/bpel/engine/BpelRuntimeContextImpl.java | 5 + .../ode/bpel/engine/PartnerLinkMyRoleImpl.java | 74 ++++++---- .../apache/ode/bpel/memdao/ProcessDaoImpl.java | 6 + .../ode/daohib/bpel/CorrelatorDaoImpl.java | 13 +- .../apache/ode/daohib/bpel/ProcessDaoImpl.java | 39 ++++- .../ode/daohib/bpel/hobj/HCorrelationSet.java | 5 + .../apache/ode/dao/jpa/CorrelatorDAOImpl.java | 10 +- .../ode/dao/jpa/MessageExchangeDAOImpl.java | 2 +- .../org/apache/ode/dao/jpa/ProcessDAOImpl.java | 14 +- 12 files changed, 251 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java index 97c252b..c68ac53 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java @@ -118,4 +118,14 @@ public interface ProcessDAO { * @return all instances that haven't completed, use with care as there could be a lot of them */ Collection<ProcessInstanceDAO> getActiveInstances(); + + /** + * Locates process instances for a specific process version that matches correlation key and instance state + * @param ckey + * Correlation key + * @param processInstanceState + * Instance state org.apache.ode.bpel.common.ProcessState + * @return collection of {@link ProcessInstanceDAO} that match correlation key, instance state + */ + Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey,short processInstanceState); } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index 6a1edce..03efdd6 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -483,15 +483,25 @@ public class BpelEngineImpl implements BpelEngine { if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) { List<BpelProcess> processes = getAllProcesses(we.getProcessId()); boolean routed = false; + + //try to find the target process and execute jobInfo.jobDetail.detailsExt.put("enqueue", false); for(BpelProcess proc : processes) { - routed = routed || proc.handleJobDetails(jobInfo.jobDetail); + routed = proc.handleJobDetails(jobInfo.jobDetail); + + if(routed) break; } + //no target process was identified, enqueue the mex for later processing if(!routed && we.getType() == JobType.INVOKE_INTERNAL) { jobInfo.jobDetail.detailsExt.put("enqueue", true); - process.handleJobDetails(jobInfo.jobDetail); + + for(BpelProcess proc : processes) { + routed = proc.handleJobDetails(jobInfo.jobDetail); + + if(routed) break; + } } } else { http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java index b5cef27..1a7ca92 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java @@ -21,9 +21,11 @@ package org.apache.ode.bpel.engine; import java.io.InputStream; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,12 +38,15 @@ import javax.xml.namespace.QName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.ode.agents.memory.SizingAgent; +import org.apache.ode.bpel.common.CorrelationKey; +import org.apache.ode.bpel.common.CorrelationKeySet; import org.apache.ode.bpel.common.FaultException; import org.apache.ode.bpel.common.ProcessState; import org.apache.ode.bpel.dao.BpelDAOConnection; import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo; import org.apache.ode.bpel.engine.extvar.ExternalVariableConf; import org.apache.ode.bpel.engine.extvar.ExternalVariableManager; import org.apache.ode.bpel.evt.ProcessInstanceEvent; @@ -51,6 +56,7 @@ import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.EndpointReference; import org.apache.ode.bpel.iapi.MessageExchange; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; import org.apache.ode.bpel.iapi.PartnerRoleChannel; import org.apache.ode.bpel.iapi.ProcessConf; import org.apache.ode.bpel.iapi.Scheduler; @@ -217,38 +223,17 @@ public class BpelProcess { return false; } - mex.getDAO().setProcess(getProcessDAO()); - - if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { - __log.debug("Aborting processing of mex " + mex + " due to interceptors."); - return false; - } - - markused(); - - // Ideally, if Java supported closure, the routing code would return null or the appropriate - // closure to handle the route. - List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null; - for (PartnerLinkMyRoleImpl target : targets) { - routings = target.findRoute(mex); - boolean createInstance = target.isCreateInstance(mex); - - if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) { - for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) { - routed = routed || invokeHandler.invoke(target, routing, createInstance); - } - } - if (routed) { - break; - } + //Actual identification of the routes and invocation of the target process will happen when enqueue is disabled. + //This is the main conditional block of code that does the actual work. + //Its only after running this block with enqueue disabled, it can be identified that mex was not routable. + //There is a separate logic following this 'if' condition to handle the mex when enqueue is enabled. + if(!enqueue) { + routed = findRouteAndInvoke(targets, mex, invokeHandler); } // Nothing found, saving for later - if (!routed && enqueue) { - // TODO this is kind of hackish when no match and more than one myrole is selected. - // we save the routing on the last myrole - // actually the message queue should be attached to the instance instead of the correlator - targets.get(targets.size()-1).noRoutingMatch(mex, routings); + if (enqueue && !routed) { + routed = noRoutingMatch(targets, mex); } else { // Now we have to update our message exchange status. If the <reply> was not hit during the // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way @@ -263,12 +248,100 @@ public class BpelProcess { _hydrationLatch.release(1); } - // For a one way, once the engine is done, the mex can be safely released. - // Sean: not really, if route is not found, we cannot delete the mex yet -// if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) { -// mex.release(); -// } + return routed; + } + + //this method should be invoked within the ambit of _hydrationLatch + private boolean findRouteAndInvoke(List<PartnerLinkMyRoleImpl> targets, MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler) { + boolean routed = false; + + //if mex is already queued then disallow overriding of process on the mex + if(!MyRoleMessageExchange.CorrelationStatus.QUEUED.equals(mex.getCorrelationStatus())) + mex.getDAO().setProcess(getProcessDAO()); + + if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { + __log.debug("Aborting processing of mex " + mex + " due to interceptors."); + return false; + } + + markused(); + + // Ideally, if Java supported closure, the routing code would return null or the appropriate + // closure to handle the route. + List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null; + for (PartnerLinkMyRoleImpl target : targets) { + routings = target.findRoute(mex); + boolean createInstance = target.isCreateInstance(mex); + + if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) { + for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) { + routed = routed || invokeHandler.invoke(target, routing, createInstance); + } + } + if (routed) { + break; + } + } + return routed; + } + + //this method should be invoked within the ambit of _hydrationLatch + private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> targets,MyRoleMessageExchangeImpl mex) { + boolean routed = false; + boolean enqueue = true; + List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null; + + /* try to find the active instance that is waiting on the correlated value and then associate the corresponding + * processDAO,correlator on the mex and enqueue for later processing. + */ + + // Need to iterate over the partnerlinks to identify the partnerlink that has the operation defined in the mex. + for (Iterator<PartnerLinkMyRoleImpl> targetItr = targets.iterator(); + (targetItr.hasNext() && !routed);) { + + PartnerLinkMyRoleImpl target = targetItr.next(); + routings = target.findRoute(mex,enqueue); + + //routings will be null if the mex operation is no defined in this myRole, iterate over next myRole. + if (routings != null) { + RoutingInfo routing = routings.get(routings.size()-1); + + if (routing != null) { + + for( Iterator<CorrelationKeySet> aSubSetItr = routing.wholeKeySet.findSubSets().iterator(); + (aSubSetItr.hasNext() && !routed);) { + CorrelationKeySet aSubSet = aSubSetItr.next(); + + for(Iterator<CorrelationKey> keyItr = aSubSet.iterator(); + (keyItr.hasNext() && !routed);) { + + CorrelationKey key = keyItr.next(); + __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",new Object[]{key,_pid}); + + // Assumption is, process instance is uniquely identifiable by any single initiated correlation key across multiple versions + // of a same process type. + + // We need to make sure the PID of process of the instance is same as that of the + // partnerlink's associated process in the iteration. Otherwise we might end up + // associating wrong correlator with the mex. + Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE); + + if (!instanceDaoList.isEmpty()) { + ProcessInstanceDAO instance = instanceDaoList.iterator().next(); + mex.getDAO().setProcess(instance.getProcess()); + target.noRoutingMatch(mex, routing); + routed = true; + + __log.info("noRoutingMatch: Active instance found instanceID: {} correlated with {} and process pid {}", + new Object[]{instance.getInstanceId(),key,_pid}); + + } + } + } + } + } + } return routed; } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java index 8ac70c3..aff7733 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java @@ -1517,6 +1517,11 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext { MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexdao); + mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED); + mex.getDAO().setInstance(_dao); + if (mex.getDAO().getCreateTime() == null) + mex.getDAO().setCreateTime(getCurrentEventDateTime()); + inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex); execute(); http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java index 203a199..42dfd7a 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java @@ -94,8 +94,12 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { } public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex) { - List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>(); - + return findRoute(mex, false); + } + + public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex, boolean enqueue) { + List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>(); + if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex })); @@ -132,7 +136,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null); return null; } - + String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID); String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID); if (__log.isDebugEnabled()) { @@ -141,19 +145,22 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { + " partnerSessionId=" + partnerSessionId); } - // Try to find a route for one of our keys. - List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet); - if (messageRoutes != null && messageRoutes.size() > 0) { - for (MessageRouteDAO messageRoute : messageRoutes) { - if (__log.isDebugEnabled()) { - __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute); + //Avoid searching for message route when enqueue is enabled. It is only when no message route is found, enqueue will be enabled. + if(!enqueue){ + // Try to find a route for one of our keys. + List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet); + if (messageRoutes != null && messageRoutes.size() > 0) { + for (MessageRouteDAO messageRoute : messageRoutes) { + if (__log.isDebugEnabled()) { + __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute); + } + routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet)); } - routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet)); } } - + if (routingInfos.size() == 0) { - routingInfos.add(new RoutingInfo(null, null, correlator, keySet)); + routingInfos.add(new RoutingInfo(null, null, correlator, keySet)); } return routingInfos; @@ -252,31 +259,38 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { instance.execute(); } + /** + * @deprecated - Now an attempt is made to identify the correct routing in the BpelProcess.invokeProcess() with enqueue enabled. + * @param mex + * @param routings + */ public void noRoutingMatch(MyRoleMessageExchangeImpl mex, List<RoutingInfo> routings) { + // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess()) + RoutingInfo routing = (routings != null && routings.size() > 0) ? routings.get(routings.size() - 1) : null; + noRoutingMatch(mex, routing); + } + + public void noRoutingMatch(MyRoleMessageExchangeImpl mex, RoutingInfo routing) { if (!mex.isAsynchronous()) { mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null); } else { - // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess()) - RoutingInfo routing = - (routings != null && routings.size() > 0) ? - routings.get(routings.size() - 1) : null; - if (routing != null) { + if (routing != null) { if (__log.isDebugEnabled()) { __log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": SAVING to DB (no match) "); } - // send event - CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex - .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet); - - evt.setProcessId(_process.getProcessDAO().getProcessId()); - evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName())); - _process._debugger.onEvent(evt); - - mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED); - - // No match, means we add message exchange to the queue. - routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet); + // send event + CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex + .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet); + + evt.setProcessId(_process.getProcessDAO().getProcessId()); + evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName())); + _process._debugger.onEvent(evt); + + mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED); + + // No match, means we add message exchange to the queue. + routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet); //Second matcher needs to be registered here JobDetails we = new JobDetails(); @@ -289,7 +303,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { }else{ _process._engine._contexts.scheduler.schedulePersistedJob(we, null); } - } + } } } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java index 3e1ebd7..41d94f2 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java @@ -251,4 +251,10 @@ class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO { } } } + + @Override + public Collection<ProcessInstanceDAO> findInstance(CorrelationKey cckey, short processInstanceState) { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java index c4cbd6c..11aaf6b 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java @@ -50,9 +50,10 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { /** filter for finding a matching selector. */ private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType"; private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId"; - private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId"; + //private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId"; + private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.correlator = :correlator"; private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " + - "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern(); + "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern(); /** Query for removing routes. */ private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?"; @@ -115,12 +116,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { String hdr = "findRoute(keySet=" + keySet + "): "; if (__log.isDebugEnabled()) __log.debug(hdr); - String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); + //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); List<CorrelationKeySet> subSets = keySet.findSubSets(); - Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); - q.setString("processType", processType); - q.setString("correlatorId", _hobj.getCorrelatorId()); + //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets)); + Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets)); + q.setEntity("correlator", getHibernateObj()); for( int i = 0; i < subSets.size(); i++ ) { q.setString("s" + i, subSets.get(i).toCanonicalString()); http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java index 278eb73..8d45610 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java @@ -35,6 +35,7 @@ import org.apache.ode.bpel.dao.CorrelatorDAO; import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.dao.ScopeDAO; import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY; import org.apache.ode.daohib.SessionManager; import org.apache.ode.daohib.bpel.hobj.HActivityRecovery; @@ -54,6 +55,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance; import org.apache.ode.daohib.bpel.hobj.HScope; import org.apache.ode.daohib.bpel.hobj.HVariableProperty; import org.apache.ode.daohib.bpel.hobj.HXmlData; +import org.apache.ode.utils.stl.CollectionsX; +import org.apache.ode.utils.stl.UnaryFunction; import org.hibernate.Criteria; import org.hibernate.Hibernate; import org.hibernate.Query; @@ -134,11 +137,18 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred @SuppressWarnings("unchecked") public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckeyValue) { entering("ProcessDaoImpl.findInstance"); - Criteria criteria = getSession().createCriteria(HCorrelationSet.class); - criteria.add(Expression.eq("scope.instance.process.id",_process.getId())); - criteria.add(Expression.eq("value", ckeyValue.toCanonicalString())); - criteria.addOrder(Order.desc("scope.instance.created")); - return criteria.list(); + Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS); + qry.setParameter("ckey", ckeyValue.toCanonicalString()); + Collection<HProcessInstance> resultList = qry.list(); + + ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>(); + CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () { + public ProcessInstanceDAO apply(HProcessInstance x) { + return new ProcessInstanceDaoImpl(_sm, x); + } + }); + + return ret; } /** @@ -296,4 +306,23 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred public String getGuid() { return _process.getGuid(); } + + @Override + public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { + entering("ProcessDaoImpl.findInstance"); + Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS); + qry.setParameter("ckey", ckey.toCanonicalString()); + qry.setEntity("process", getHibernateObj()); + qry.setShort("state", processInstanceState); + Collection<HProcessInstance> resultList = qry.list(); + + ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>(); + CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () { + public ProcessInstanceDAO apply(HProcessInstance x) { + return new ProcessInstanceDaoImpl(_sm, x); + } + }); + + return ret; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java index 1c0dba9..1656989 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java @@ -28,11 +28,16 @@ import java.util.Collection; * @hibernate.query name="SELECT_CORSET_IDS_BY_INSTANCES" query="select id from HCorrelationSet as c where c.instance in (:instances)" * @hibernate.query name="SELECT_CORSETS_BY_INSTANCES" query="from HCorrelationSet as c left join fetch c.properties where c.instance.id in (:instances)" * @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from HCorrelationSet as c left join fetch c.process left join fetch c.instance where c.instance.state in (:states)" + * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey" + * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey and + * cs.process = :process and cs.instance.state = :state" */ public class HCorrelationSet extends HObject{ public static final String SELECT_CORSET_IDS_BY_INSTANCES = "SELECT_CORSET_IDS_BY_INSTANCES"; public static final String SELECT_CORSETS_BY_INSTANCES = "SELECT_CORSETS_BY_INSTANCES"; public static final String SELECT_CORSETS_BY_PROCESS_STATES = "SELECT_CORSETS_BY_PROCESS_STATES"; + public static final String SELECT_INSTANCES_BY_CORSETS = "SELECT_INSTANCES_BY_CORSETS"; + public static final String SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS = "SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS"; private HProcess _process; private HProcessInstance _instance; http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java index 84681b7..9f6fbce 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java @@ -36,7 +36,7 @@ import java.util.List; public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { private static Logger __log = LoggerFactory.getLogger(CorrelatorDAOImpl.class); public final static String DELETE_CORRELATORS_BY_PROCESS = "DELETE_CORRELATORS_BY_PROCESS"; - private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator._process._processType = :ptype and route._correlator._correlatorKey = :corrkey"; + private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator = :corr "; @Id @Column(name = "CORRELATOR_ID") @@ -77,6 +77,11 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { MessageExchangeDAOImpl mex = itr.next(); if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet, false)) { itr.remove(); + MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex; + mexImpl.setCorrelationKeySet(null); + mexImpl.setCorrelator(null); + //getEM().remove(mex); + //getEM().flush(); return mex; } } @@ -101,8 +106,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO { } List<CorrelationKeySet> subSets = correlationKeySet.findSubSets(); Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets)); - qry.setParameter("ptype", _process.getType().toString()); - qry.setParameter("corrkey", _correlatorKey); + qry.setParameter("corr", this); for (int i = 0; i < subSets.size(); i++) { qry.setParameter("s" + i, subSets.get(i).toCanonicalString()); } http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java index c0f35ff..e334520 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java @@ -323,7 +323,7 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang } void setCorrelationKeySet(CorrelationKeySet correlationKeySet) { - _correlationKeys = correlationKeySet.toCanonicalString(); + _correlationKeys = correlationKeySet != null ? correlationKeySet.toCanonicalString() : null; } CorrelationKeySet getCorrelationKeySet() { http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java index 2b22d08..c109fda 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java @@ -44,7 +44,9 @@ import java.util.List; @NamedQueries({ @NamedQuery(name="ActiveInstances", query="select i from ProcessInstanceDAOImpl as i where i._process = :process and i._state = :state"), @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"), - @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process") + @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process"), + @NamedQuery(name="InstanceByCKeyProcessState", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey and " + + "cs._scope._processInstance._process = :process and cs._scope._processInstance._state = :state") }) public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { private static final Logger __log = LoggerFactory.getLogger(ProcessDAOImpl.class); @@ -226,4 +228,14 @@ public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { qry.setParameter("state", ProcessState.STATE_ACTIVE); return qry.getResultList(); } + + @Override + public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) { + //need to make this query more efficient + Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState"); + qry.setParameter("ckey", ckey.toCanonicalString()); + qry.setParameter("process", this); + qry.setParameter("state", ProcessState.STATE_ACTIVE); + return qry.getResultList(); + } }
