Author: rr
Date: Sun May 23 20:20:56 2010
New Revision: 947487
URL: http://svn.apache.org/viewvc?rev=947487&view=rev
Log:
Acquire instance lock for INVOKE_INTERNAL
Modified:
ode/branches/APACHE_ODE_1.3.4.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Modified:
ode/branches/APACHE_ODE_1.3.4.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.3.4.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=947487&r1=947486&r2=947487&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.3.4.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
(original)
+++
ode/branches/APACHE_ODE_1.3.4.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
Sun May 23 20:20:56 2010
@@ -295,7 +295,7 @@ public interface Scheduler {
* processing.
* @author mszefler
*/
- public class JobProcessorException extends Exception {
+ public class JobProcessorException extends RuntimeException {
private static final long serialVersionUID = 1L;
public final boolean retry;
Modified:
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=947487&r1=947486&r2=947487&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Sun May 23 20:20:56 2010
@@ -372,31 +372,36 @@ public class BpelEngineImpl implements B
if (process == null) return null;
return process.getOProcess();
}
-
- public void onScheduledJob(Scheduler.JobInfo jobInfo) throws
Scheduler.JobProcessorException {
- final JobDetails we = jobInfo.jobDetail;
-
- if( __log.isTraceEnabled() ) __log.trace("[JOB] onScheduledJob " +
jobInfo + "" + we.getInstanceId());
-
+
+ public void acquireInstanceLock(final Long iid) {
// We lock the instance to prevent concurrent transactions and prevent
unnecessary rollbacks,
// Note that we don't want to wait too long here to get our lock,
since we are likely holding
// on to scheduler's locks of various sorts.
try {
- _instanceLockManager.lock(we.getInstanceId(), 1,
TimeUnit.MICROSECONDS);
+ _instanceLockManager.lock(iid, 1, TimeUnit.MICROSECONDS);
_contexts.scheduler.registerSynchronizer(new
Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
- _instanceLockManager.unlock(we.getInstanceId());
+ _instanceLockManager.unlock(iid);
}
public void beforeCompletion() { }
});
} catch (InterruptedException e) {
// Retry later.
- __log.debug("Thread interrupted, job will be rescheduled: " +
jobInfo);
+ __log.debug("Thread interrupted, job will be rescheduled");
throw new Scheduler.JobProcessorException(true);
} catch
(org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
- __log.debug("Instance " + we.getInstanceId() + " is busy,
rescheduling job.");
+ __log.debug("Instance " + iid + " is busy, rescheduling job.");
throw new Scheduler.JobProcessorException(true);
}
+ }
+
+ public void onScheduledJob(Scheduler.JobInfo jobInfo) throws
Scheduler.JobProcessorException {
+ final JobDetails we = jobInfo.jobDetail;
+
+ if( __log.isTraceEnabled() ) __log.trace("[JOB] onScheduledJob " +
jobInfo + "" + we.getInstanceId());
+
+ acquireInstanceLock(we.getInstanceId());
+
// DONT PUT CODE HERE-need this method real tight in a try/catch
block, we need to handle
// all types of failure here, the scheduler is not going to know how
to handle our errors,
// ALSO we have to release the lock obtained above (IMPORTANT), lest
the whole system come
@@ -457,6 +462,8 @@ public class BpelEngineImpl implements B
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
+ } catch (Scheduler.JobProcessorException e) {
+ throw e;
} catch (BpelEngineException bee) {
__log.error(__msgs.msgScheduledJobFailed(we), bee);
throw new Scheduler.JobProcessorException(bee, checkRetry(we));
Modified:
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=947487&r1=947486&r2=947487&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/branches/APACHE_ODE_1.3.4.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Sun May 23 20:20:56 2010
@@ -284,6 +284,7 @@ public class BpelProcess {
return true;
} else if (routing.messageRoute != null) {
// Found a route, hitting it
+
_engine.acquireInstanceLock(routing.messageRoute.getTargetInstance().getInstanceId());
target.invokeInstance(mex, routing);
return true;
}