Repository: ode Updated Branches: refs/heads/ode-1.3.x 07894e7e2 -> 6be91e10a
ODE-974: enqueue the message for Active Process even if no instance was identified waiting for the message Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/6be91e10 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/6be91e10 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/6be91e10 Branch: refs/heads/ode-1.3.x Commit: 6be91e10a7c01a6f487f5130d24a5de4d8ea4be0 Parents: 07894e7 Author: sathwik <[email protected]> Authored: Wed Feb 24 19:23:53 2016 +0530 Committer: sathwik <[email protected]> Committed: Wed Feb 24 19:23:53 2016 +0530 ---------------------------------------------------------------------- .../apache/ode/bpel/engine/BpelEngineImpl.java | 9 ++ .../org/apache/ode/bpel/engine/BpelProcess.java | 88 ++++++++++++-------- .../ode/bpel/engine/replayer/Replayer.java | 2 +- .../ReplayerBpelRuntimeContextImpl.java | 2 +- .../bpel/engine/replayer/ReplayerContext.java | 2 +- 5 files changed, 65 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index 03efdd6..63d829e 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -486,6 +486,7 @@ public class BpelEngineImpl implements BpelEngine { //try to find the target process and execute jobInfo.jobDetail.detailsExt.put("enqueue", false); + jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", false); for(BpelProcess proc : processes) { routed = proc.handleJobDetails(jobInfo.jobDetail); @@ -496,12 +497,20 @@ public class BpelEngineImpl implements BpelEngine { //no target process was identified, enqueue the mex for later processing if(!routed && we.getType() == JobType.INVOKE_INTERNAL) { jobInfo.jobDetail.detailsExt.put("enqueue", true); + jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", false); + //try to identify any waiting instance to register this early message for(BpelProcess proc : processes) { routed = proc.handleJobDetails(jobInfo.jobDetail); if(routed) break; } + + //no active instance was found, then enqueue this early message for a future instance. + if(!routed){ + jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", true); + process.handleJobDetails(jobInfo.jobDetail); + } } } else { http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java index 92aad6f..606b73b 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java @@ -210,7 +210,7 @@ public class BpelProcess { boolean invoke(PartnerLinkMyRoleImpl target, PartnerLinkMyRoleImpl.RoutingInfo routingInfo, boolean createInstance); } - public boolean invokeProcess(MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler, boolean enqueue) { + public boolean invokeProcess(MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler, boolean enqueue, boolean enqueueForFutureInstance) { boolean routed = false; try { @@ -223,17 +223,17 @@ public class BpelProcess { return false; } - //Actual identification of the routes and invocation of the target process will happen when enqueue is disabled. + //Actual identification of the routes and invocation of the target process will happen when enqueue and enqueueForFutureInstance is disabled. //This is the main conditional block of code that does the actual work. - //Its only after running this block with enqueue disabled, it can be identified that mex was not routable. - //There is a separate logic following this 'if' condition to handle the mex when enqueue is enabled. - if(!enqueue) { + //Its only after running this block with enqueue and enqueueForFutureInstance disabled, it can be identified that mex was not routable. + //There is a separate logic following this 'if' condition to handle the mex when enqueue or enqueueForFutureInstance is enabled. + if(!(enqueue || enqueueForFutureInstance)) { routed = findRouteAndInvoke(targets, mex, invokeHandler); } // Nothing found, saving for later - if (enqueue && !routed) { - routed = noRoutingMatch(targets, mex); + if ((enqueue || enqueueForFutureInstance) && !routed) { + routed = noRoutingMatch(targets, mex, enqueue, enqueueForFutureInstance); } else { // Now we have to update our message exchange status. If the <reply> was not hit during the // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way @@ -286,9 +286,8 @@ public class BpelProcess { } //this method should be invoked within the ambit of _hydrationLatch - private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> targets,MyRoleMessageExchangeImpl mex) { + private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> targets,MyRoleMessageExchangeImpl mex, boolean enqueue, boolean enqueueForFutureInstance) { boolean routed = false; - boolean enqueue = true; List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null; LinkedHashSet<ProcessInstanceDAO> intersectionInstanceSet = new LinkedHashSet<ProcessInstanceDAO>(); @@ -301,7 +300,7 @@ public class BpelProcess { (targetItr.hasNext() && !routed);) { PartnerLinkMyRoleImpl target = targetItr.next(); - routings = target.findRoute(mex,enqueue); + routings = target.findRoute(mex,enqueue||enqueueForFutureInstance); //routings will be null if the mex operation is no defined in this myRole, iterate over next myRole. if (routings != null) { @@ -309,35 +308,47 @@ public class BpelProcess { if (routing != null) { - for( Iterator<CorrelationKey> keyItr = routing.wholeKeySet.iterator(); - (keyItr.hasNext() && !routed);) { + if(enqueueForFutureInstance && isActive()) { + mex.getDAO().setProcess(getProcessDAO()); + target.noRoutingMatch(mex, routing); + routed = true; + + __log.info("noRoutingMatch: mex has been registered for a future Instance and process pid {}",_pid); + + return routed; + } + + if(enqueue) { + for( Iterator<CorrelationKey> keyItr = routing.wholeKeySet.iterator(); + (keyItr.hasNext() && !routed);) { - CorrelationKey key = keyItr.next(); - __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",key,_pid); + CorrelationKey key = keyItr.next(); + __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",key,_pid); - // We need to make sure the PID of process of the instance is same as that of the - // partnerlink's associated process in the iteration. Otherwise we might end up - // associating wrong correlator with the mex. - Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE); + // We need to make sure the PID of process of the instance is same as that of the + // partnerlink's associated process in the iteration. Otherwise we might end up + // associating wrong correlator with the mex. + Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE); - if (!(instanceDaoList.isEmpty())) { - //find the intersection - if(!intersectionInstanceSet.isEmpty()) - intersectionInstanceSet.retainAll(instanceDaoList); - else - intersectionInstanceSet.addAll(instanceDaoList); + if (!(instanceDaoList.isEmpty())) { + //find the intersection + if(!intersectionInstanceSet.isEmpty()) + intersectionInstanceSet.retainAll(instanceDaoList); + else + intersectionInstanceSet.addAll(instanceDaoList); - __log.debug("noRoutingMatch: intersection Instance set : {} ",intersectionInstanceSet); + __log.debug("noRoutingMatch: intersection Instance set : {} ",intersectionInstanceSet); + } } - } - if(!(intersectionInstanceSet.isEmpty())) { - ProcessInstanceDAO instance = intersectionInstanceSet.iterator().next(); - mex.getDAO().setProcess(instance.getProcess()); - target.noRoutingMatch(mex, routing); - routed = true; + if(!(intersectionInstanceSet.isEmpty())) { + ProcessInstanceDAO instance = intersectionInstanceSet.iterator().next(); + mex.getDAO().setProcess(instance.getProcess()); + target.noRoutingMatch(mex, routing); + routed = true; - __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),_pid); + __log.info("noRoutingMatch: Active instance found instanceID: {} and process pid {}",instance.getInstanceId(),_pid); + } } } } @@ -353,8 +364,15 @@ public class BpelProcess { * Entry point for message exchanges aimed at the my role. * * @param mex + * @param enqueue - if enabled, mex will be considered as an early arriving message and an attempt will be made to identify the process instance which might be waiting for it. + * @param enqueueForFutureInstance - if enabled, mex will be considered as an early arriving message and will be registered with the Active Process as no Instance was waiting for it. + * + * Note: if both enqueueForFutureInstance and enqueue are enabled, then enqueueForFutureInstance will take precedence. + * if(enqueueForFutureInstance and current process is active) mex is registered with the active process + * else if(enqueue) an attempt is made to identify any instance that might be waiting + * */ - boolean invokeProcess(final MyRoleMessageExchangeImpl mex, boolean enqueue) { + boolean invokeProcess(final MyRoleMessageExchangeImpl mex, boolean enqueue,boolean enqueueForFutureInstance) { return invokeProcess(mex, new InvokeHandler() { public boolean invoke(PartnerLinkMyRoleImpl target, PartnerLinkMyRoleImpl.RoutingInfo routing, boolean createInstance) { if (routing.messageRoute == null && createInstance && isActive()) { @@ -369,7 +387,7 @@ public class BpelProcess { } return false; } - },enqueue); + },enqueue,enqueueForFutureInstance); } /** Several myroles can use the same service in a given process */ @@ -508,7 +526,7 @@ public class BpelProcess { return true; } - routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue")); + routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"),(Boolean) jobData.detailsExt.get("enqueueForFutureInstance")); if (we.getType() == JobType.MEX_MATCHER && routed) { mex.getDAO().releasePremieMessages(); } http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java index 6d07b13..599169a 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java @@ -274,7 +274,7 @@ public class Replayer { } return false; } - },true); + },true,false); } else if (we.getType() == JobType.INVOKE_RESPONSE) { __log.debug("reply for live communication"); ReplayerContext ctx = findReplayedInstance(we.getInstanceId()); http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java index 713abc0..0115a70 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java @@ -308,7 +308,7 @@ public class ReplayerBpelRuntimeContextImpl extends BpelRuntimeContextImpl { } return false; } - },true); + },true,false); } http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java index a34beec..1a24991 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java @@ -270,7 +270,7 @@ public class ReplayerContext { } return false; } - },true); + },true,false); for (Exchange e : exchanges) { if (e.getType() == ExchangeType.M) {
