Author: mriou
Date: Wed Feb 11 23:36:35 2009
New Revision: 743565

URL: http://svn.apache.org/viewvc?rev=743565&view=rev
Log:
Tying a few loose ends in route, register and unregister making deploy and 
retire more reliable. Added a recovery mechanism in case we routed to a retired 
process for the creation of a new process.

Modified:
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 Wed Feb 11 23:36:35 2009
@@ -173,7 +173,6 @@
      * 
      * @param target
      * @param meps
-     * @param istyle
      * @return
      * @throws BpelEngineException
      */
@@ -247,44 +246,19 @@
             if (__log.isDebugEnabled())
                 __log.debug("Deactivating process " + p.getPID());
 
-            Endpoint processEndpoint = null;
-//             for (Endpoint endpoint : _serviceMap.keySet()) {
-//                     List<BpelProcess> processes = _serviceMap.get(endpoint);
-//                     for (BpelProcess candidate : processes) {
-//                             if (candidate.getPID().equals(process)) {
-//                                     processes.remove(candidate);
-//                                     processEndpoint = endpoint;
-//                             }
-//                     }
-//             }
-                   Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter 
= _serviceMap.entrySet().iterator();
-                   while (serviceIter.hasNext()) {
-                       Map.Entry<Endpoint,List<BpelProcess>> processEntry = 
serviceIter.next();
-                       List<BpelProcess> entryProcesses = 
processEntry.getValue();
-                       for (int i = 0; i < entryProcesses.size(); i++) {
-                               BpelProcess entryProcess = 
entryProcesses.get(i);
-                           if (entryProcess.getPID().equals(process)) {
-                               processEndpoint = processEntry.getKey();
-                               entryProcesses.remove(entryProcess);
-                           }
-                       }
-                   }
-
-            // Only deactivating if no other process (version) need that 
endpoint anymore
-            // We're only routing using an endpoint/process map for now which 
means that deploying
-            // several versions of the same process using the same endpoint 
(which is the common
-            // case) will override previous deployments endpoints. So checking 
the endpoint is not
-            // enough, we also have to check other versions of the same 
process.
-            // A bit clunky, the maps held here should be retought a bit.
-            boolean otherVersions = false;
-            for (BpelProcess bpelProcess : _activeProcesses.values()) {
-                if (bpelProcess.getProcessType().equals(p.getProcessType()))
-                    otherVersions = true;
+            Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter = 
_serviceMap.entrySet().iterator();
+            while (serviceIter.hasNext()) {
+                Map.Entry<Endpoint,List<BpelProcess>> processEntry = 
serviceIter.next();
+                Iterator<BpelProcess> entryProcesses = 
processEntry.getValue().iterator();
+                while (entryProcesses.hasNext()) {
+                    BpelProcess entryProcess = entryProcesses.next();
+                    if (entryProcess.getPID().equals(process)) {
+                        entryProcesses.remove();
+                    }
+                }
             }
-            // Deactivate process anyway because it now checks for shared 
endpoints 
-//            if (_serviceMap.get(processEndpoint) == null && !otherVersions) {
-                p.deactivate();
-//            }
+
+            p.deactivate();
         }
         return p;
     }
@@ -307,13 +281,14 @@
                _serviceMap.put(e, processes);
             }
             // Remove any older version of the process from the list
-            for (int i = 0; i < processes.size(); i++) {
-               BpelProcess cachedVersion = processes.get(i);
-               __log.debug("cached version " + cachedVersion.getPID() + " vs 
registering version " + process.getPID());
-               if 
(cachedVersion.getProcessType().equals(process.getProcessType())) {
-                       processes.remove(cachedVersion);
-                       cachedVersion.deactivate();
-               }
+            Iterator<BpelProcess> processesIter = processes.iterator();
+            while (processesIter.hasNext()) {
+                BpelProcess cachedVersion = processesIter.next();
+                __log.debug("cached version " + cachedVersion.getPID() + " vs 
registering version " + process.getPID());
+                if 
(cachedVersion.getProcessType().equals(process.getProcessType())) {
+                    processesIter.remove();
+                    cachedVersion.deactivate();
+                }
             }
             processes.add(process);
         }

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Wed Feb 11 23:36:35 2009
@@ -60,6 +60,7 @@
 import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
 import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
 import org.apache.ode.bpel.runtime.channels.FaultData;
 import org.apache.ode.jacob.soup.ReplacementMap;
 import org.apache.ode.utils.ObjectPrinter;
@@ -375,12 +376,10 @@
         try {
             _hydrationLatch.latch(1);
             markused();
-
             if (__log.isDebugEnabled()) {
                 
__log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] 
{ "jobData", jobData }));
             }
 
-
             WorkEvent we = new WorkEvent(jobData);
 
             // Process level events
@@ -389,7 +388,26 @@
                     __log.debug("InvokeInternal event for mexid " + 
we.getMexId());
                 }
                 MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) 
_engine.getMessageExchange(we.getMexId());
-                invokeProcess(mex);
+                try {
+                    invokeProcess(mex);
+                } catch (InvalidProcessException e) {
+                    // we're invoking a target process, trying to see if we 
can retarget the message
+                    // to the current version (only applies when it's a new 
process creation)
+                    if (e.getCauseCode() == 
InvalidProcessException.RETIRED_CAUSE_CODE) {
+                        boolean found = false;
+                        for (BpelProcess process : 
getEngine()._activeProcesses.values()) {
+                            if 
(process.getConf().getState().equals(org.apache.ode.bpel.iapi.ProcessState.ACTIVE)
+                                    && 
process.getConf().getType().equals(getConf().getType())) {
+                                we.setProcessId(process._pid);
+                                mex._process = process;
+                                found = true;
+                                process.handleWorkEvent(jobData);
+                                break;
+                            }
+                        }
+                        if (!found) throw e;
+                    } else throw e;
+                }
             } else {
                 // Instance level events
                 ProcessInstanceDAO procInstance = 
getProcessDAO().getInstance(we.getIID());
@@ -960,4 +978,4 @@
         return _pconf;
     }
 
-}
\ No newline at end of file
+}

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 Wed Feb 11 23:36:35 2009
@@ -21,6 +21,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -119,12 +120,12 @@
         if (!processInterceptors(this, 
InterceptorInvoker.__onBpelServerInvoked))
             return null;
 
-        List<BpelProcess> targets = _engine.route(getDAO().getCallee(), 
request);
+        BpelProcess target = _process;
 
         if (__log.isDebugEnabled())
-            __log.debug("invoke() EPR= " + _epr + " ==> " + targets);
+            __log.debug("invoke() EPR= " + _epr + " ==> " + target);
 
-        if (targets == null || targets.isEmpty()) {
+        if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));
 
@@ -132,29 +133,24 @@
             setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, 
null);
             return null;
         } else {
-               for (BpelProcess target : targets) {
-                       if (target.getPID().equals(_process.getPID())) {
-                           // Schedule a new job for invocation
-                           WorkEvent we = new WorkEvent();
-                           we.setType(WorkEvent.Type.INVOKE_INTERNAL);
-                           if (target.isInMemory()) we.setInMem(true);
-                           we.setProcessId(target.getPID());
-                           we.setMexId(getDAO().getMessageExchangeId());
-               
-                           if (getOperation().getOutput() != null) {
-                               ResponseCallback callback = new 
ResponseCallback();
-                               _waitingCallbacks.put(getClientId(), callback);
-                           }
-               
-                           setStatus(Status.ASYNC);
-                           if (target.isInMemory())
-                               
_engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
-                           else
-                               
_engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-                           return new ResponseFuture(getClientId());
-                       }
-               }
-               return null;
+            // Schedule a new job for invocation
+            WorkEvent we = new WorkEvent();
+            we.setType(WorkEvent.Type.INVOKE_INTERNAL);
+            if (target.isInMemory()) we.setInMem(true);
+            we.setProcessId(target.getPID());
+            we.setMexId(getDAO().getMessageExchangeId());
+
+            if (getOperation().getOutput() != null) {
+                ResponseCallback callback = new ResponseCallback();
+                _waitingCallbacks.put(getClientId(), callback);
+            }
+
+            setStatus(Status.ASYNC);
+            if (target.isInMemory())
+                _engine._contexts.scheduler.scheduleVolatileJob(true, 
we.getDetail());
+            else
+                
_engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+            return new ResponseFuture(getClientId());
         }
     }
 


Reply via email to