I made a patch for it.
I had to modify slightly lockingmanager to be able to acquire a lock
twice for the same thread. A kind of a recurrent lock in case if it
was taken already i BpelEngineImpl.onScheduledJob.
Regards,
2009/2/20 Rafal Rusin <[email protected]>:
> I added some logging and saw a following execution path:
>
> 21:19:03,218 | DEBUG | pool-4-thread-1 | SimpleScheduler |
> duler.simple.SimpleScheduler$4 378 | runJob
> hqejbhcnphr426la4ov8qn[0]: {type=INVOKE_INTERNAL, pid={...}Process,
> mexid=130132}
> 21:19:03,218 | DEBUG | pool-4-thread-1 | SimpleScheduler |
> heduler.simple.SimpleScheduler 200 | Beginning a new transaction
> 21:19:03,220 | DEBUG | pool-4-thread-1 | InstanceLockManager |
> pel.engine.InstanceLockManager 48 | Thread[pool-4-thread-1,5,main]:
> lock(iid=null, time=1MICROSECONDS)
> 21:19:03,220 | DEBUG | pool-4-thread-1 | InstanceLockManager |
> pel.engine.InstanceLockManager 52 | Thread[pool-4-thread-1,5,main]:
> lock(iid=null, time=1MICROSECONDS)--> no locking for null iid
> 21:19:04,482 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl |
> .engine.BpelRuntimeContextImpl 161 | BpelRuntimeContextImpl created
> for instance 129983 8550354. INDEXED STATE=
> 21:19:04,497 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl |
> .engine.BpelRuntimeContextImpl 844 | BpelRuntimeContextImpl execute
> started for instance 129983 8550354. INDEXED STATE=
> 21:19:05,344 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl |
> .engine.BpelRuntimeContextImpl 851 | BpelRuntimeContextImpl setting
> execution state on instance 129983 8550354. INDEXED
> 21:19:05,359 | DEBUG | pool-4-thread-1 | SimpleScheduler |
> heduler.simple.SimpleScheduler 215 | Commiting...
> 21:19:05,385 | DEBUG | pool-4-thread-1 | InstanceLockManager |
> pel.engine.InstanceLockManager 90 | Thread[pool-4-thread-1,5,main]:
> unlock(iid=null)
>
> So, here INVOKE_INTERNAL job doesn't have iid, so there's no locking
> and bpelruntimecontext is created, executed and saved on instance
> 129983.
> There was also a concurrent execution in other thread on instance 129983:
> 21:19:05,424 | DEBUG | pool-4-thread-3 | BpelRuntimeContextImpl |
> .engine.BpelRuntimeContextImpl 851 | BpelRuntimeContextImpl setting
> execution state on instance 129983 13326304. INDEXED
>
> I saw that INVOKE_INTERNAL job is created only in
> MyRoleMessageExchangeImpl. And it doesn't set up iid. However it's a
> good behaviour, because there's no knowledge at this point to which
> instance it will go (correct me if I'm wrong).
> Later, INVOKE_INTERNAL job without iid is processed, and it gets
> following stacktrace to reach bpelruntimecontext evaluation:
> at
> org.apache.ode.bpel.engine.BpelRuntimeContextImpl.<init>(BpelRuntimeContextImpl.java:161)
> at
> org.apache.ode.bpel.engine.BpelProcess.createRuntimeContext(BpelProcess.java:759)
> at
> org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.invokeInstance(PartnerLinkMyRoleImpl.java:219)
> at
> org.apache.ode.bpel.engine.BpelProcess.invokeProcess(BpelProcess.java:219)
> at
> org.apache.ode.bpel.engine.BpelProcess.handleWorkEvent(BpelProcess.java:402)
> at
> org.apache.ode.bpel.engine.BpelEngineImpl.onScheduledJob(BpelEngineImpl.java:424)
> at
> org.apache.ode.bpel.engine.BpelServerImpl.onScheduledJob(BpelServerImpl.java:377)
> at
> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:390)
> at
> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:384)
> at
> org.apache.ode.scheduler.simple.SimpleScheduler.execTransaction(SimpleScheduler.java:208)
> at
> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:383)
> at
> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:376)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)
> at java.util.concurrent.FutureTask.run(FutureTask.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
> at java.lang.Thread.run(Thread.java:595)
>
> So, here in BpelProcess, it hits
> } else if (routing.messageRoute != null) {
> // Found a route, hitting it
> target.invokeInstance(mex, routing);
> routed = true;
> }
>
> So I think here should be a reschedule of this job with iid set up.
>
>
> 2009/2/16 Rafal Rusin <[email protected]>:
>> 2009/2/13 Matthieu Riou <[email protected]>:
>>> On Fri, Feb 13, 2009 at 2:10 AM, Rafal Rusin <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> could you explain me a bit bpel execution regarding BpelRuntimeContext?
>>>> I saw a following happened:
>>>>
>>>> 09:59:25,185 | INFO | pool-4-thread-4 | BpelRuntimeContextImpl |
>>>> .engine.BpelRuntimeContextImpl 160 | BpelRuntimeContextImpl created
>>>> for instance 20598 24589427
>>>> 09:59:25,269 | INFO | pool-4-thread-7 | BpelRuntimeContextImpl |
>>>> .engine.BpelRuntimeContextImpl 160 | BpelRuntimeContextImpl created
>>>> for instance 20598 1821816
>>>> 09:59:25,628 | INFO | pool-4-thread-4 | BpelRuntimeContextImpl |
>>>> .engine.BpelRuntimeContextImpl 843 | BpelRuntimeContextImpl setting
>>>> execution state on instance 20598 24589427
>>>> 09:59:25,781 | INFO | pool-4-thread-7 | BpelRuntimeContextImpl |
>>>> .engine.BpelRuntimeContextImpl 843 | BpelRuntimeContextImpl setting
>>>> execution state on instance 20598 1821816
>>>>
>>>
>>> Is that on trunk or 1.x ? In any case I'm puzzled as to how this is
>>> possible. There's a big lock on instances so that a given instance can't be
>>> executed by two threads in parallel. Check BpelEngineImpl.onScheduledJob.
>>> Given that an INVOKE_RESPONSE handling goes through this method, it's
>>> surprising. Logging on the lock manager could provide more details.
>>
>> OK, thanks for clarifying, I'll check it and give more info.
>>
>>
>>> And the isolation level READ_COMMITTED is the correct one. With a "lower"
>>> isolation level, several assumptions coded in the engine would be broken as
>>> transactions start influencing each other (which gets tricky in a highly
>>> multi-threaded, shared environment).
>>>
>>> Thanks,
>>> Matthieu
>>>
>>>
>>>>
>>>> The numbers at the end (24589427 and 1821816) are hash ids for
>>>> BpelRuntimeContextImpl.
>>>> It happened after executing concurrently two jobs INVOKE_RESPONSE for
>>>> invoke1 and invoke3. A bpel process was like this:
>>>> <flow>
>>>> <sequence>
>>>> <invoke1/>
>>>> <invoke2/>
>>>> </sequence>
>>>> <sequence>
>>>> <invoke3/>
>>>> <invoke4/>
>>>> </sequence>
>>>> </flow>
>>>>
>>>> My question is whether such execution should be made synchronized in
>>>> ODE? Here setting execution state happened concurrently in
>>>> pool-4-thread-4 and pool-4-thread-7.
>>>> I used transaction isolation level READ_COMMITTED (a default one). In
>>>> this scenario, I had two jobs concurrently, successfully committed and
>>>> no critical section was used.
>>>> Both transactions saw committed data, so READ_COMMITTED was held.
>>>> It lead to storing incorrect execution state for process instance in
>>>> DB, since one INVOKE_RESPONSE job work was lost due to overwritten
>>>> data.
>>>> What transaction isolation level is correct for ODE? And what should I
>>>> do to correct this scenario? Do you have any clues?
>>>>
>>>> I had an error in Oracle:
>>>> 09:59:27,383 | ERROR | pool-4-thread-7 | JacobVPU |
>>>> b.vpu.JacobVPU$JacobThreadImpl 463 | Method "run" in class
>>>> "org.apache.ode.bpel.engine.BpelRuntimeContextImpl$6" threw an
>>>> unexpected excep
>>>> tion.
>>>> java.lang.IllegalArgumentException: No such channel; id=225
>>>> at
>>>> org.apache.ode.jacob.vpu.ExecutionQueueImpl.findChannelFrame(ExecutionQueueImpl.java:205)
>>>> at
>>>> org.apache.ode.jacob.vpu.ExecutionQueueImpl.consumeExport(ExecutionQueueImpl.java:232)
>>>> at
>>>> org.apache.ode.jacob.vpu.JacobVPU$JacobThreadImpl.importChannel(JacobVPU.java:369)
>>>> at
>>>> org.apache.ode.jacob.JacobObject.importChannel(JacobObject.java:47)
>>>> at
>>>> org.apache.ode.bpel.engine.BpelRuntimeContextImpl$6.run(BpelRuntimeContextImpl.java:967)
>>>> at sun.reflect.GeneratedMethodAccessor120.invoke(Unknown Source)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>> at java.lang.reflect.Method.invoke(Method.java:585)
>>>> at
>>>> org.apache.ode.jacob.vpu.JacobVPU$JacobThreadImpl.run(JacobVPU.java:451)
>>>> at org.apache.ode.jacob.vpu.JacobVPU.execute(JacobVPU.java:139)
>>>> at
>>>> org.apache.ode.bpel.engine.BpelRuntimeContextImpl.execute(BpelRuntimeContextImpl.java:839)
>>>> at
>>>> org.apache.ode.bpel.engine.BpelProcess.handleWorkEvent(BpelProcess.java:418)
>>>> at
>>>> org.apache.ode.bpel.engine.BpelEngineImpl.onScheduledJob(BpelEngineImpl.java:424)
>>>> at
>>>> org.apache.ode.bpel.engine.BpelServerImpl.onScheduledJob(BpelServerImpl.java:377)
>>>> at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:386)
>>>> at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:380)
>>>> at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler.execTransaction(SimpleScheduler.java:208)
>>>> at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:379)
>>>> at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:376)
>>>> at
>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:123)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>>>> at java.lang.Thread.run(Thread.java:595)
>>>>
>>>>
>>>> And in Derby, there was a dead lock.
>>>>
>>>> Regards,
>>>> --
>>>> Rafał Rusin
>>>> www.mimuw.edu.pl/~rrusin <http://www.mimuw.edu.pl/%7Errusin>
>>>>
>>>
>>
>>
>>
>> --
>> Rafał Rusin
>> www.mimuw.edu.pl/~rrusin
>>
>
>
>
> --
> Rafał Rusin
> www.mimuw.edu.pl/~rrusin
>
--
Rafał Rusin
www.mimuw.edu.pl/~rrusin
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.BpelEngineException;
@@ -345,7 +346,26 @@
if (process == null) return null;
return process.getOProcess();
}
+
+ public void acquireLockForIID(final long iid) {
+ while (true) {
+ try {
+ _instanceLockManager.lock(iid, 1, TimeUnit.MICROSECONDS);
+ _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+ public void afterCompletion(boolean success) {
+ _instanceLockManager.unlock(iid);
+ }
+ public void beforeCompletion() { }
+ });
+
+ return;
+ } catch (InterruptedException e) {
+ } catch (TimeoutException e) {
+ }
+ }
+ }
+
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (working copy)
@@ -188,6 +188,7 @@
}
ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
+ _process._engine.acquireLockForIID(newInstance.getInstanceId());
BpelRuntimeContextImpl instance = _process
.createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
@@ -215,6 +216,7 @@
ProcessInstanceDAO instanceDao = routing.messageRoute.getTargetInstance();
+ _process._engine.acquireLockForIID(instanceDao.getInstanceId());
// Reload process instance for DAO.
BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
instance.inputMsgMatch(routing.messageRoute.getGroupId(), routing.messageRoute.getIndex(), mex);
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java (revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java (working copy)
@@ -43,12 +43,16 @@
private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo> ();
public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, TimeoutException {
- if (iid == null) return;
-
String thrd = Thread.currentThread().toString();
if (__log.isDebugEnabled())
__log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")");
+ if (iid == null) {
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")--> no locking for null iid");
+ return;
+ }
+
InstanceInfo li;
_mutex.lock();
@@ -63,6 +67,12 @@
__log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->GRANTED");
return;
} else {
+ if (li.acquierer.equals(Thread.currentThread())) {
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->RECURRING, GRANTED");
+ return;
+ }
+
if (__log.isDebugEnabled())
__log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->WAITING(held by " + li.acquierer + ")");
@@ -81,17 +91,22 @@
}
public void unlock(Long iid) {
- if (iid == null) return;
-
String thrd = Thread.currentThread().toString();
if (__log.isDebugEnabled())
__log.debug(thrd + ": unlock(iid=" + iid + ")");
+ if (iid == null) {
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + ": lock(iid=" + iid + ")--> no unlocking for null iid");
+ return;
+ }
_mutex.lock();
try {
InstanceInfo li = _locks.get(iid);
- if (li == null)
- throw new IllegalStateException("Instance not locked, cannot unlock!");
+ if (li == null) {
+ __log.debug(thrd + ": Instance not locked");
+ return;
+ }
_locks.remove(iid);