Author: mriou
Date: Wed Feb 11 23:36:35 2009
New Revision: 743565
URL: http://svn.apache.org/viewvc?rev=743565&view=rev
Log:
Tying a few loose ends in route, register and unregister making deploy and
retire more reliable. Added a recovery mechanism in case we routed to a retired
process for the creation of a new process.
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Wed Feb 11 23:36:35 2009
@@ -173,7 +173,6 @@
*
* @param target
* @param meps
- * @param istyle
* @return
* @throws BpelEngineException
*/
@@ -247,44 +246,19 @@
if (__log.isDebugEnabled())
__log.debug("Deactivating process " + p.getPID());
- Endpoint processEndpoint = null;
-// for (Endpoint endpoint : _serviceMap.keySet()) {
-// List<BpelProcess> processes = _serviceMap.get(endpoint);
-// for (BpelProcess candidate : processes) {
-// if (candidate.getPID().equals(process)) {
-// processes.remove(candidate);
-// processEndpoint = endpoint;
-// }
-// }
-// }
- Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter
= _serviceMap.entrySet().iterator();
- while (serviceIter.hasNext()) {
- Map.Entry<Endpoint,List<BpelProcess>> processEntry =
serviceIter.next();
- List<BpelProcess> entryProcesses =
processEntry.getValue();
- for (int i = 0; i < entryProcesses.size(); i++) {
- BpelProcess entryProcess =
entryProcesses.get(i);
- if (entryProcess.getPID().equals(process)) {
- processEndpoint = processEntry.getKey();
- entryProcesses.remove(entryProcess);
- }
- }
- }
-
- // Only deactivating if no other process (version) need that
endpoint anymore
- // We're only routing using an endpoint/process map for now which
means that deploying
- // several versions of the same process using the same endpoint
(which is the common
- // case) will override previous deployments endpoints. So checking
the endpoint is not
- // enough, we also have to check other versions of the same
process.
- // A bit clunky, the maps held here should be retought a bit.
- boolean otherVersions = false;
- for (BpelProcess bpelProcess : _activeProcesses.values()) {
- if (bpelProcess.getProcessType().equals(p.getProcessType()))
- otherVersions = true;
+ Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter =
_serviceMap.entrySet().iterator();
+ while (serviceIter.hasNext()) {
+ Map.Entry<Endpoint,List<BpelProcess>> processEntry =
serviceIter.next();
+ Iterator<BpelProcess> entryProcesses =
processEntry.getValue().iterator();
+ while (entryProcesses.hasNext()) {
+ BpelProcess entryProcess = entryProcesses.next();
+ if (entryProcess.getPID().equals(process)) {
+ entryProcesses.remove();
+ }
+ }
}
- // Deactivate process anyway because it now checks for shared
endpoints
-// if (_serviceMap.get(processEndpoint) == null && !otherVersions) {
- p.deactivate();
-// }
+
+ p.deactivate();
}
return p;
}
@@ -307,13 +281,14 @@
_serviceMap.put(e, processes);
}
// Remove any older version of the process from the list
- for (int i = 0; i < processes.size(); i++) {
- BpelProcess cachedVersion = processes.get(i);
- __log.debug("cached version " + cachedVersion.getPID() + " vs
registering version " + process.getPID());
- if
(cachedVersion.getProcessType().equals(process.getProcessType())) {
- processes.remove(cachedVersion);
- cachedVersion.deactivate();
- }
+ Iterator<BpelProcess> processesIter = processes.iterator();
+ while (processesIter.hasNext()) {
+ BpelProcess cachedVersion = processesIter.next();
+ __log.debug("cached version " + cachedVersion.getPID() + " vs
registering version " + process.getPID());
+ if
(cachedVersion.getProcessType().equals(process.getProcessType())) {
+ processesIter.remove();
+ cachedVersion.deactivate();
+ }
}
processes.add(process);
}
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Wed Feb 11 23:36:35 2009
@@ -60,6 +60,7 @@
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.jacob.soup.ReplacementMap;
import org.apache.ode.utils.ObjectPrinter;
@@ -375,12 +376,10 @@
try {
_hydrationLatch.latch(1);
markused();
-
if (__log.isDebugEnabled()) {
__log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[]
{ "jobData", jobData }));
}
-
WorkEvent we = new WorkEvent(jobData);
// Process level events
@@ -389,7 +388,26 @@
__log.debug("InvokeInternal event for mexid " +
we.getMexId());
}
MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl)
_engine.getMessageExchange(we.getMexId());
- invokeProcess(mex);
+ try {
+ invokeProcess(mex);
+ } catch (InvalidProcessException e) {
+ // we're invoking a target process, trying to see if we
can retarget the message
+ // to the current version (only applies when it's a new
process creation)
+ if (e.getCauseCode() ==
InvalidProcessException.RETIRED_CAUSE_CODE) {
+ boolean found = false;
+ for (BpelProcess process :
getEngine()._activeProcesses.values()) {
+ if
(process.getConf().getState().equals(org.apache.ode.bpel.iapi.ProcessState.ACTIVE)
+ &&
process.getConf().getType().equals(getConf().getType())) {
+ we.setProcessId(process._pid);
+ mex._process = process;
+ found = true;
+ process.handleWorkEvent(jobData);
+ break;
+ }
+ }
+ if (!found) throw e;
+ } else throw e;
+ }
} else {
// Instance level events
ProcessInstanceDAO procInstance =
getProcessDAO().getInstance(we.getIID());
@@ -960,4 +978,4 @@
return _pconf;
}
-}
\ No newline at end of file
+}
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=743565&r1=743564&r2=743565&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Wed Feb 11 23:36:35 2009
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -119,12 +120,12 @@
if (!processInterceptors(this,
InterceptorInvoker.__onBpelServerInvoked))
return null;
- List<BpelProcess> targets = _engine.route(getDAO().getCallee(),
request);
+ BpelProcess target = _process;
if (__log.isDebugEnabled())
- __log.debug("invoke() EPR= " + _epr + " ==> " + targets);
+ __log.debug("invoke() EPR= " + _epr + " ==> " + target);
- if (targets == null || targets.isEmpty()) {
+ if (target == null) {
if (__log.isWarnEnabled())
__log.warn(__msgs.msgUnknownEPR("" + _epr));
@@ -132,29 +133,24 @@
setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null,
null);
return null;
} else {
- for (BpelProcess target : targets) {
- if (target.getPID().equals(_process.getPID())) {
- // Schedule a new job for invocation
- WorkEvent we = new WorkEvent();
- we.setType(WorkEvent.Type.INVOKE_INTERNAL);
- if (target.isInMemory()) we.setInMem(true);
- we.setProcessId(target.getPID());
- we.setMexId(getDAO().getMessageExchangeId());
-
- if (getOperation().getOutput() != null) {
- ResponseCallback callback = new
ResponseCallback();
- _waitingCallbacks.put(getClientId(), callback);
- }
-
- setStatus(Status.ASYNC);
- if (target.isInMemory())
-
_engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
- else
-
_engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
- return new ResponseFuture(getClientId());
- }
- }
- return null;
+ // Schedule a new job for invocation
+ WorkEvent we = new WorkEvent();
+ we.setType(WorkEvent.Type.INVOKE_INTERNAL);
+ if (target.isInMemory()) we.setInMem(true);
+ we.setProcessId(target.getPID());
+ we.setMexId(getDAO().getMessageExchangeId());
+
+ if (getOperation().getOutput() != null) {
+ ResponseCallback callback = new ResponseCallback();
+ _waitingCallbacks.put(getClientId(), callback);
+ }
+
+ setStatus(Status.ASYNC);
+ if (target.isInMemory())
+ _engine._contexts.scheduler.scheduleVolatileJob(true,
we.getDetail());
+ else
+
_engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ return new ResponseFuture(getClientId());
}
}