Repository: ode
Updated Branches:
  refs/heads/ode-1.3.x 07894e7e2 -> 6be91e10a


ODE-974: enqueue the message for Active Process even if no instance was 
identified waiting for the message


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/6be91e10
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/6be91e10
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/6be91e10

Branch: refs/heads/ode-1.3.x
Commit: 6be91e10a7c01a6f487f5130d24a5de4d8ea4be0
Parents: 07894e7
Author: sathwik <[email protected]>
Authored: Wed Feb 24 19:23:53 2016 +0530
Committer: sathwik <[email protected]>
Committed: Wed Feb 24 19:23:53 2016 +0530

----------------------------------------------------------------------
 .../apache/ode/bpel/engine/BpelEngineImpl.java  |  9 ++
 .../org/apache/ode/bpel/engine/BpelProcess.java | 88 ++++++++++++--------
 .../ode/bpel/engine/replayer/Replayer.java      |  2 +-
 .../ReplayerBpelRuntimeContextImpl.java         |  2 +-
 .../bpel/engine/replayer/ReplayerContext.java   |  2 +-
 5 files changed, 65 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 03efdd6..63d829e 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -486,6 +486,7 @@ public class BpelEngineImpl implements BpelEngine {
 
                     //try to find the target process and execute
                     jobInfo.jobDetail.detailsExt.put("enqueue", false);
+                    
jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", false);
 
                     for(BpelProcess proc : processes) {
                         routed = proc.handleJobDetails(jobInfo.jobDetail);
@@ -496,12 +497,20 @@ public class BpelEngineImpl implements BpelEngine {
                     //no target process was identified, enqueue the mex for 
later processing
                     if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
                         jobInfo.jobDetail.detailsExt.put("enqueue", true);
+                        
jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", false);
 
+                        //try to identify any waiting instance to register 
this early message
                         for(BpelProcess proc : processes) {
                             routed = proc.handleJobDetails(jobInfo.jobDetail);
 
                             if(routed) break;
                         }
+
+                        //no active instance was found, then enqueue this 
early message for a future instance.
+                        if(!routed){
+                            
jobInfo.jobDetail.detailsExt.put("enqueueForFutureInstance", true);
+                            process.handleJobDetails(jobInfo.jobDetail);
+                        }
                     }
                 }
                 else {

http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index 92aad6f..606b73b 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -210,7 +210,7 @@ public class BpelProcess {
         boolean invoke(PartnerLinkMyRoleImpl target, 
PartnerLinkMyRoleImpl.RoutingInfo routingInfo, boolean createInstance);
     }
 
-    public boolean invokeProcess(MyRoleMessageExchangeImpl mex, InvokeHandler 
invokeHandler, boolean enqueue) {
+    public boolean invokeProcess(MyRoleMessageExchangeImpl mex, InvokeHandler 
invokeHandler, boolean enqueue, boolean enqueueForFutureInstance) {
         boolean routed = false;
 
         try {
@@ -223,17 +223,17 @@ public class BpelProcess {
                 return false;
             }
 
-            //Actual identification of the routes and invocation of the target 
process will happen when enqueue is disabled.
+            //Actual identification of the routes and invocation of the target 
process will happen when enqueue and enqueueForFutureInstance is disabled.
             //This is the main conditional block of code that does the actual 
work.
-            //Its only after running this block with enqueue disabled, it can 
be identified that mex was not routable.
-            //There is a separate logic following this 'if' condition to 
handle the mex when enqueue is enabled.
-            if(!enqueue) {
+            //Its only after running this block with enqueue and 
enqueueForFutureInstance disabled, it can be identified that mex was not 
routable.
+            //There is a separate logic following this 'if' condition to 
handle the mex when enqueue or enqueueForFutureInstance is enabled.
+            if(!(enqueue || enqueueForFutureInstance)) {
                 routed = findRouteAndInvoke(targets, mex, invokeHandler);
             }
 
             // Nothing found, saving for later
-            if (enqueue && !routed) {
-                routed = noRoutingMatch(targets, mex);
+            if ((enqueue || enqueueForFutureInstance) && !routed) {
+                routed = noRoutingMatch(targets, mex, enqueue, 
enqueueForFutureInstance);
             } else {
                 // Now we have to update our message exchange status. If the 
<reply> was not hit during the
                 // invocation, then we will be in the "REQUEST" phase which 
means that either this was a one-way
@@ -286,9 +286,8 @@ public class BpelProcess {
     }
 
     //this method should be invoked within the ambit of _hydrationLatch
-    private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> 
targets,MyRoleMessageExchangeImpl mex) {
+    private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> 
targets,MyRoleMessageExchangeImpl mex, boolean enqueue, boolean 
enqueueForFutureInstance) {
         boolean routed = false;
-        boolean enqueue = true;
         List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
         LinkedHashSet<ProcessInstanceDAO> intersectionInstanceSet = new 
LinkedHashSet<ProcessInstanceDAO>();
 
@@ -301,7 +300,7 @@ public class BpelProcess {
                 (targetItr.hasNext() && !routed);) {
 
             PartnerLinkMyRoleImpl target = targetItr.next();
-            routings = target.findRoute(mex,enqueue);
+            routings = target.findRoute(mex,enqueue||enqueueForFutureInstance);
 
             //routings will be null if the mex operation is no defined in this 
myRole, iterate over next myRole.
             if (routings != null) {
@@ -309,35 +308,47 @@ public class BpelProcess {
 
                 if (routing != null) {
 
-                    for( Iterator<CorrelationKey> keyItr = 
routing.wholeKeySet.iterator();
-                            (keyItr.hasNext() && !routed);) {
+                    if(enqueueForFutureInstance && isActive()) {
+                        mex.getDAO().setProcess(getProcessDAO());
+                        target.noRoutingMatch(mex, routing);
+                        routed = true;
+
+                        __log.info("noRoutingMatch: mex has been registered 
for a future Instance and process pid {}",_pid);
+
+                        return routed;
+                    }
+
+                    if(enqueue) {
+                        for( Iterator<CorrelationKey> keyItr = 
routing.wholeKeySet.iterator();
+                                (keyItr.hasNext() && !routed);) {
 
-                        CorrelationKey key = keyItr.next();
-                        __log.info("noRoutingMatch: Finding active instance 
correlated with {} and process pid {}",key,_pid);
+                            CorrelationKey key = keyItr.next();
+                            __log.info("noRoutingMatch: Finding active 
instance correlated with {} and process pid {}",key,_pid);
 
-                        // We need to make sure the PID of process of the 
instance is same as that of the
-                        // partnerlink's associated process in the iteration. 
Otherwise we might end up
-                        // associating wrong correlator with the mex.
-                        Collection<ProcessInstanceDAO> instanceDaoList = 
getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE);
+                            // We need to make sure the PID of process of the 
instance is same as that of the
+                            // partnerlink's associated process in the 
iteration. Otherwise we might end up
+                            // associating wrong correlator with the mex.
+                            Collection<ProcessInstanceDAO> instanceDaoList = 
getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE);
 
-                        if (!(instanceDaoList.isEmpty())) {
-                            //find the intersection
-                            if(!intersectionInstanceSet.isEmpty())
-                                
intersectionInstanceSet.retainAll(instanceDaoList);
-                            else
-                                
intersectionInstanceSet.addAll(instanceDaoList);
+                            if (!(instanceDaoList.isEmpty())) {
+                                //find the intersection
+                                if(!intersectionInstanceSet.isEmpty())
+                                    
intersectionInstanceSet.retainAll(instanceDaoList);
+                                else
+                                    
intersectionInstanceSet.addAll(instanceDaoList);
 
-                            __log.debug("noRoutingMatch: intersection Instance 
set : {} ",intersectionInstanceSet);
+                                __log.debug("noRoutingMatch: intersection 
Instance set : {} ",intersectionInstanceSet);
+                            }
                         }
-                    }
 
-                    if(!(intersectionInstanceSet.isEmpty())) {
-                        ProcessInstanceDAO instance = 
intersectionInstanceSet.iterator().next();
-                        mex.getDAO().setProcess(instance.getProcess());
-                        target.noRoutingMatch(mex, routing);
-                        routed = true;
+                        if(!(intersectionInstanceSet.isEmpty())) {
+                            ProcessInstanceDAO instance = 
intersectionInstanceSet.iterator().next();
+                            mex.getDAO().setProcess(instance.getProcess());
+                            target.noRoutingMatch(mex, routing);
+                            routed = true;
 
-                        __log.info("noRoutingMatch: Active instance found 
instanceID: {} and process pid {}",instance.getInstanceId(),_pid);
+                            __log.info("noRoutingMatch: Active instance found 
instanceID: {} and process pid {}",instance.getInstanceId(),_pid);
+                        }
                     }
                 }
             }
@@ -353,8 +364,15 @@ public class BpelProcess {
      * Entry point for message exchanges aimed at the my role.
      *
      * @param mex
+     * @param enqueue - if enabled, mex will be considered as an early 
arriving message and an attempt will be made to identify the process instance 
which might be waiting for it. 
+     * @param enqueueForFutureInstance - if enabled, mex will be considered as 
an early arriving message and will be registered with the Active Process as no 
Instance was waiting for it.
+     *
+     *  Note: if both enqueueForFutureInstance and enqueue are enabled, then 
enqueueForFutureInstance will take precedence.
+     *  if(enqueueForFutureInstance and current process is active)  mex is 
registered with the active process
+     *  else if(enqueue) an attempt is made to identify any instance that 
might be waiting
+     *
      */
-    boolean invokeProcess(final MyRoleMessageExchangeImpl mex, boolean 
enqueue) {
+    boolean invokeProcess(final MyRoleMessageExchangeImpl mex, boolean 
enqueue,boolean enqueueForFutureInstance) {
         return invokeProcess(mex, new InvokeHandler() {
                     public boolean invoke(PartnerLinkMyRoleImpl target, 
PartnerLinkMyRoleImpl.RoutingInfo routing, boolean createInstance) {
                           if (routing.messageRoute == null && createInstance  
&& isActive()) {
@@ -369,7 +387,7 @@ public class BpelProcess {
                           }
                           return false;
                     }
-                },enqueue);
+                },enqueue,enqueueForFutureInstance);
     }
 
     /** Several myroles can use the same service in a given process */
@@ -508,7 +526,7 @@ public class BpelProcess {
                     return true;
                 }
 
-                routed = invokeProcess(mex, (Boolean) 
jobData.detailsExt.get("enqueue"));
+                routed = invokeProcess(mex, (Boolean) 
jobData.detailsExt.get("enqueue"),(Boolean) 
jobData.detailsExt.get("enqueueForFutureInstance"));
                 if (we.getType() == JobType.MEX_MATCHER && routed) {
                     mex.getDAO().releasePremieMessages();
                 }

http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
index 6d07b13..599169a 100644
--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
+++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
@@ -274,7 +274,7 @@ public class Replayer {
                             }
                             return false;
                         }
-                    },true);
+                    },true,false);
         } else if (we.getType() == JobType.INVOKE_RESPONSE) {
             __log.debug("reply for live communication");
             ReplayerContext ctx = findReplayedInstance(we.getInstanceId());

http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
index 713abc0..0115a70 100644
--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
+++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
@@ -308,7 +308,7 @@ public class ReplayerBpelRuntimeContextImpl extends 
BpelRuntimeContextImpl {
                 }
                 return false;
             }
-        },true);
+        },true,false);
 
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/6be91e10/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
----------------------------------------------------------------------
diff --git 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
index a34beec..1a24991 100644
--- 
a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
+++ 
b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
@@ -270,7 +270,7 @@ public class ReplayerContext {
                                     }
                                     return false;
                                 }
-                            },true);
+                            },true,false);
 
                     for (Exchange e : exchanges) {
                         if (e.getType() == ExchangeType.M) {

Reply via email to