I made a patch for it.
I had to modify slightly lockingmanager to be able to acquire a lock
twice for the same thread. A kind of a recurrent lock in case if it
was taken already i BpelEngineImpl.onScheduledJob.

Regards,

2009/2/20 Rafal Rusin <[email protected]>:
> I added some logging and saw a following execution path:
>
> 21:19:03,218 | DEBUG | pool-4-thread-1 | SimpleScheduler          |
> duler.simple.SimpleScheduler$4  378 | runJob
> hqejbhcnphr426la4ov8qn[0]: {type=INVOKE_INTERNAL, pid={...}Process,
> mexid=130132}
> 21:19:03,218 | DEBUG | pool-4-thread-1 | SimpleScheduler          |
> heduler.simple.SimpleScheduler  200 | Beginning a new transaction
> 21:19:03,220 | DEBUG | pool-4-thread-1 | InstanceLockManager      |
> pel.engine.InstanceLockManager   48 | Thread[pool-4-thread-1,5,main]:
> lock(iid=null, time=1MICROSECONDS)
> 21:19:03,220 | DEBUG | pool-4-thread-1 | InstanceLockManager      |
> pel.engine.InstanceLockManager   52 | Thread[pool-4-thread-1,5,main]:
> lock(iid=null, time=1MICROSECONDS)--> no locking for null iid
> 21:19:04,482 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl   |
> .engine.BpelRuntimeContextImpl  161 | BpelRuntimeContextImpl created
> for instance 129983 8550354. INDEXED STATE=
> 21:19:04,497 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl   |
> .engine.BpelRuntimeContextImpl  844 | BpelRuntimeContextImpl execute
> started for instance 129983 8550354. INDEXED STATE=
> 21:19:05,344 | DEBUG | pool-4-thread-1 | BpelRuntimeContextImpl   |
> .engine.BpelRuntimeContextImpl  851 | BpelRuntimeContextImpl setting
> execution state on instance 129983 8550354. INDEXED
> 21:19:05,359 | DEBUG | pool-4-thread-1 | SimpleScheduler          |
> heduler.simple.SimpleScheduler  215 | Commiting...
> 21:19:05,385 | DEBUG | pool-4-thread-1 | InstanceLockManager      |
> pel.engine.InstanceLockManager   90 | Thread[pool-4-thread-1,5,main]:
> unlock(iid=null)
>
> So, here INVOKE_INTERNAL job doesn't have iid, so there's no locking
> and bpelruntimecontext is created, executed and saved on instance
> 129983.
> There was also a concurrent execution in other thread on instance 129983:
> 21:19:05,424 | DEBUG | pool-4-thread-3 | BpelRuntimeContextImpl   |
> .engine.BpelRuntimeContextImpl  851 | BpelRuntimeContextImpl setting
> execution state on instance 129983 13326304. INDEXED
>
> I saw that INVOKE_INTERNAL job is created only in
> MyRoleMessageExchangeImpl. And it doesn't set up iid. However it's a
> good behaviour, because there's no knowledge at this point to which
> instance it will go (correct me if I'm wrong).
> Later, INVOKE_INTERNAL job without iid is processed, and it gets
> following stacktrace to reach bpelruntimecontext evaluation:
>        at 
> org.apache.ode.bpel.engine.BpelRuntimeContextImpl.<init>(BpelRuntimeContextImpl.java:161)
>        at 
> org.apache.ode.bpel.engine.BpelProcess.createRuntimeContext(BpelProcess.java:759)
>        at 
> org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.invokeInstance(PartnerLinkMyRoleImpl.java:219)
>        at 
> org.apache.ode.bpel.engine.BpelProcess.invokeProcess(BpelProcess.java:219)
>        at 
> org.apache.ode.bpel.engine.BpelProcess.handleWorkEvent(BpelProcess.java:402)
>        at 
> org.apache.ode.bpel.engine.BpelEngineImpl.onScheduledJob(BpelEngineImpl.java:424)
>        at 
> org.apache.ode.bpel.engine.BpelServerImpl.onScheduledJob(BpelServerImpl.java:377)
>        at 
> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:390)
>        at 
> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:384)
>        at 
> org.apache.ode.scheduler.simple.SimpleScheduler.execTransaction(SimpleScheduler.java:208)
>        at 
> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:383)
>        at 
> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:376)
>        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:123)
>        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>        at java.lang.Thread.run(Thread.java:595)
>
> So, here in BpelProcess, it hits
>                        } else if (routing.messageRoute != null) {
>                            // Found a route, hitting it
>                            target.invokeInstance(mex, routing);
>                            routed = true;
>                        }
>
> So I think here should be a reschedule of this job with iid set up.
>
>
> 2009/2/16 Rafal Rusin <[email protected]>:
>> 2009/2/13 Matthieu Riou <[email protected]>:
>>> On Fri, Feb 13, 2009 at 2:10 AM, Rafal Rusin <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> could you explain me a bit bpel execution regarding BpelRuntimeContext?
>>>> I saw a following happened:
>>>>
>>>> 09:59:25,185 | INFO  | pool-4-thread-4 | BpelRuntimeContextImpl   |
>>>> .engine.BpelRuntimeContextImpl  160 | BpelRuntimeContextImpl created
>>>> for instance 20598 24589427
>>>> 09:59:25,269 | INFO  | pool-4-thread-7 | BpelRuntimeContextImpl   |
>>>> .engine.BpelRuntimeContextImpl  160 | BpelRuntimeContextImpl created
>>>> for instance 20598 1821816
>>>> 09:59:25,628 | INFO  | pool-4-thread-4 | BpelRuntimeContextImpl   |
>>>> .engine.BpelRuntimeContextImpl  843 | BpelRuntimeContextImpl setting
>>>> execution state on instance 20598 24589427
>>>> 09:59:25,781 | INFO  | pool-4-thread-7 | BpelRuntimeContextImpl   |
>>>> .engine.BpelRuntimeContextImpl  843 | BpelRuntimeContextImpl setting
>>>> execution state on instance 20598 1821816
>>>>
>>>
>>> Is that on trunk or 1.x ? In any case I'm puzzled as to how this is
>>> possible. There's a big lock on instances so that a given instance can't be
>>> executed by two threads in parallel. Check BpelEngineImpl.onScheduledJob.
>>> Given that an INVOKE_RESPONSE handling goes through this method, it's
>>> surprising. Logging on the lock manager could provide more details.
>>
>> OK, thanks for clarifying, I'll check it and give more info.
>>
>>
>>> And the isolation level READ_COMMITTED is the correct one. With a "lower"
>>> isolation level, several assumptions coded in the engine would be broken as
>>> transactions start influencing each other (which gets tricky in a highly
>>> multi-threaded, shared environment).
>>>
>>> Thanks,
>>> Matthieu
>>>
>>>
>>>>
>>>> The numbers at the end (24589427 and 1821816) are hash ids for
>>>> BpelRuntimeContextImpl.
>>>> It happened after executing concurrently two jobs INVOKE_RESPONSE for
>>>> invoke1 and invoke3. A bpel process was like this:
>>>> <flow>
>>>>  <sequence>
>>>>    <invoke1/>
>>>>    <invoke2/>
>>>>  </sequence>
>>>>  <sequence>
>>>>    <invoke3/>
>>>>    <invoke4/>
>>>>  </sequence>
>>>> </flow>
>>>>
>>>> My question is whether such execution should be made synchronized in
>>>> ODE? Here setting execution state happened concurrently in
>>>> pool-4-thread-4 and pool-4-thread-7.
>>>> I used transaction isolation level READ_COMMITTED (a default one). In
>>>> this scenario, I had two jobs concurrently, successfully committed and
>>>> no critical section was used.
>>>> Both transactions saw committed data, so READ_COMMITTED was held.
>>>> It lead to storing incorrect execution state for process instance in
>>>> DB, since one INVOKE_RESPONSE job work was lost due to overwritten
>>>> data.
>>>> What transaction isolation level is correct for ODE? And what should I
>>>> do to correct this scenario? Do you have any clues?
>>>>
>>>> I had an error in Oracle:
>>>> 09:59:27,383 | ERROR | pool-4-thread-7 | JacobVPU                 |
>>>> b.vpu.JacobVPU$JacobThreadImpl  463 | Method "run" in class
>>>> "org.apache.ode.bpel.engine.BpelRuntimeContextImpl$6" threw an
>>>> unexpected excep
>>>> tion.
>>>> java.lang.IllegalArgumentException: No such channel; id=225
>>>>        at
>>>> org.apache.ode.jacob.vpu.ExecutionQueueImpl.findChannelFrame(ExecutionQueueImpl.java:205)
>>>>        at
>>>> org.apache.ode.jacob.vpu.ExecutionQueueImpl.consumeExport(ExecutionQueueImpl.java:232)
>>>>        at
>>>> org.apache.ode.jacob.vpu.JacobVPU$JacobThreadImpl.importChannel(JacobVPU.java:369)
>>>>        at
>>>> org.apache.ode.jacob.JacobObject.importChannel(JacobObject.java:47)
>>>>        at
>>>> org.apache.ode.bpel.engine.BpelRuntimeContextImpl$6.run(BpelRuntimeContextImpl.java:967)
>>>>        at sun.reflect.GeneratedMethodAccessor120.invoke(Unknown Source)
>>>>        at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>>        at java.lang.reflect.Method.invoke(Method.java:585)
>>>>        at
>>>> org.apache.ode.jacob.vpu.JacobVPU$JacobThreadImpl.run(JacobVPU.java:451)
>>>>        at org.apache.ode.jacob.vpu.JacobVPU.execute(JacobVPU.java:139)
>>>>        at
>>>> org.apache.ode.bpel.engine.BpelRuntimeContextImpl.execute(BpelRuntimeContextImpl.java:839)
>>>>        at
>>>> org.apache.ode.bpel.engine.BpelProcess.handleWorkEvent(BpelProcess.java:418)
>>>>        at
>>>> org.apache.ode.bpel.engine.BpelEngineImpl.onScheduledJob(BpelEngineImpl.java:424)
>>>>        at
>>>> org.apache.ode.bpel.engine.BpelServerImpl.onScheduledJob(BpelServerImpl.java:377)
>>>>        at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:386)
>>>>        at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4$1.call(SimpleScheduler.java:380)
>>>>        at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler.execTransaction(SimpleScheduler.java:208)
>>>>        at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:379)
>>>>        at
>>>> org.apache.ode.scheduler.simple.SimpleScheduler$4.call(SimpleScheduler.java:376)
>>>>        at
>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)
>>>>        at java.util.concurrent.FutureTask.run(FutureTask.java:123)
>>>>        at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>>>>        at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>>>>        at java.lang.Thread.run(Thread.java:595)
>>>>
>>>>
>>>> And in Derby, there was a dead lock.
>>>>
>>>> Regards,
>>>> --
>>>> Rafał Rusin
>>>> www.mimuw.edu.pl/~rrusin <http://www.mimuw.edu.pl/%7Errusin>
>>>>
>>>
>>
>>
>>
>> --
>> Rafał Rusin
>> www.mimuw.edu.pl/~rrusin
>>
>
>
>
> --
> Rafał Rusin
> www.mimuw.edu.pl/~rrusin
>



-- 
Rafał Rusin
www.mimuw.edu.pl/~rrusin
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java	(revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java	(working copy)
@@ -41,6 +41,7 @@
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.iapi.BpelEngine;
 import org.apache.ode.bpel.iapi.BpelEngineException;
@@ -345,7 +346,26 @@
         if (process == null) return null;
         return process.getOProcess();
     }
+    
+    public void acquireLockForIID(final long iid) {
+    	while (true) {
+	        try {
+				_instanceLockManager.lock(iid, 1, TimeUnit.MICROSECONDS);
 
+				_contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+		            public void afterCompletion(boolean success) {
+		                _instanceLockManager.unlock(iid);
+		            }
+		            public void beforeCompletion() { }
+		        });
+				
+				return;
+			} catch (InterruptedException e) {
+			} catch (TimeoutException e) {
+			}
+    	}
+    }
+
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
         final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
 
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java	(revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java	(working copy)
@@ -188,6 +188,7 @@
         }
 
         ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
+        _process._engine.acquireLockForIID(newInstance.getInstanceId());
 
         BpelRuntimeContextImpl instance = _process
                 .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
@@ -215,6 +216,7 @@
 
         ProcessInstanceDAO instanceDao = routing.messageRoute.getTargetInstance();
 
+        _process._engine.acquireLockForIID(instanceDao.getInstanceId());
         // Reload process instance for DAO.
         BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
         instance.inputMsgMatch(routing.messageRoute.getGroupId(), routing.messageRoute.getIndex(), mex);
Index: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
===================================================================
--- bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java	(revision 738300)
+++ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java	(working copy)
@@ -43,12 +43,16 @@
     private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo> ();
     
     public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, TimeoutException {
-        if (iid == null) return;
-
         String thrd = Thread.currentThread().toString();
         if (__log.isDebugEnabled())
             __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")");
 
+        if (iid == null) {
+            if (__log.isDebugEnabled())
+                __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")--> no locking for null iid");                    
+        	return;
+        }
+
         InstanceInfo li;
 
         _mutex.lock();
@@ -63,6 +67,12 @@
                         __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->GRANTED");                    
                     return;
                 } else {
+                	if (li.acquierer.equals(Thread.currentThread())) {
+                        if (__log.isDebugEnabled())
+                            __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->RECURRING, GRANTED");
+                        return;
+                	}
+                	
                     if (__log.isDebugEnabled())
                         __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")-->WAITING(held by " + li.acquierer + ")");
                     
@@ -81,17 +91,22 @@
     }
     
     public void unlock(Long iid)  {
-        if (iid == null) return;
-
         String thrd = Thread.currentThread().toString();
         if (__log.isDebugEnabled())
             __log.debug(thrd + ": unlock(iid=" + iid + ")");        
 
+        if (iid == null) {
+            if (__log.isDebugEnabled())
+                __log.debug(thrd + ": lock(iid=" + iid + ")--> no unlocking for null iid");
+        	return;
+        }
         _mutex.lock();
         try {
             InstanceInfo li = _locks.get(iid);
-            if (li == null)
-                throw new IllegalStateException("Instance not locked, cannot unlock!");
+            if (li == null) {
+                __log.debug(thrd + ": Instance not locked");
+                return;
+            }
             
             _locks.remove(iid);
             

Reply via email to