Hi,
I'm having a hard time finding any detailed documentation about the maxLockTime 
of the JobExecutor.

So I resorted to looking at the code, but I'm still not sure I understand its 
intent and implementation.

Background: We are using jBPM to implemented data processing flows. We use 
async nodes and have multiple JobExecutor threads running. Some of the async 
steps may take hours - (It's an automated process so Node.executeAction() chews 
on the data for a long time).  This is being done with jBPM 3.2.1 and the jar 
that is attached to http://jira.jboss.com/jira/browse/JBPM-1042

During this we discovered that the if the node takes longer than maxLockTime 
(which in the defaults was set to 600000 - 10 minutes).  The executeAction() 
for the node did complete successfully, but jBPM still chose to rollback the 
transaction even though no exception was thrown and we properly took a 
transaction to the next node. 

Question: If the execute method on the Node completes w/o error and did its job 
and moved on to the next node, why does it make sense to rollback the process 
instance?

Our workaround is to just set maxLockTime = MAXINT.

See the code below: From JobExecutorThread.java
protected void executeJob(Job job) {
  |     JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
  |     try {
  |       JobSession jobSession = jbpmContext.getJobSession();
  |       job = jobSession.loadJob(job.getId());
  | 
  |       try {
  |         log.debug("executing job "+job);
  |         if (job.execute(jbpmContext)) {
  |           jobSession.deleteJob(job);
  |         }
  | 
  |       } catch (Exception e) {
  |         log.debug("exception while executing '"+job+"'", e);
  |         StringWriter sw = new StringWriter();
  |         e.printStackTrace(new PrintWriter(sw));
  |         job.setException(sw.toString());
  |         job.setRetries(job.getRetries()-1);
  |       }
  |       
  |       // if this job is locked too long
  |       long totalLockTimeInMillis = System.currentTimeMillis() - 
job.getLockTime().getTime(); 
  |       if (totalLockTimeInMillis>maxLockTime) {
  |         jbpmContext.setRollbackOnly();
  |       }
  | 
  |     } finally {
  |       try {
  |         jbpmContext.close();
  |       } catch (RuntimeException e) {
  |         log.error("problem committing job execution transaction", e);
  |         throw e;
  |       }
  |     }
  |   }


While reading the code to try to figure this out I noticed the following things 
that I didn't understand. Any insight to what I'm missing would be great.

1. JobExecutor.start() creates a new instance of LockMonitorThread. But I can't 
find where that thread is ever started.   Doesn't seem to make sense to create 
the thread and never start it. Is the intent that the thread be running?

anonymous wrote :  public synchronized void start() {
  |     if (! isStarted) {
  |       log.debug("starting thread group '"+name+"'...");
  |       for (int i=0; i<nbrOfThreads; i++) {
  |         startThread();
  |       }
  |       isStarted = true;
  |     } else {
  |       log.debug("ignoring start: thread group '"+name+"' is already 
started'");
  |     }
  |     
  |     lockMonitorThread = new LockMonitorThread(jbpmConfiguration, 
lockMonitorInterval, maxLockTime, lockBufferTime);
  |   }
  | 

2. But if the LockMonitorThread would be running, I think it would lead to some 
potentially harmful side effects.  It updates a job's lock,  but the 
JobExecutorThread that is running the job is going to continue processing 
firing events and taking the transition to the next node until a wait state is 
reached. As far as I can tell, no code every looks at the lock on the job. To 
me it looks like it would simply reset the lock owner and the time so that 
another instance of JobExecutorThread could acquire the job even though it is 
still running in another thread.

>From LockMonitorThread:
protected void unlockOverdueJobs() {
  |     JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
  |     try {
  |       JobSession jobSession = jbpmContext.getJobSession();
  |       
  |       Date treshold = new 
Date(System.currentTimeMillis()-maxLockTime-lockBufferTime);
  |       List jobsWithOverdueLockTime = 
jobSession.findJobsWithOverdueLockTime(treshold);
  |       Iterator iter = jobsWithOverdueLockTime.iterator();
  |       while (iter.hasNext()) {
  |         Job job = (Job) iter.next();
  |         // unlock
  |         log.debug("unlocking "+job+ " owned by thread "+job.getLockOwner());
  |         job.setLockOwner(null);
  |         job.setLockTime(null);
  |         jobSession.saveJob(job);
  |       }
  | 
  |     } finally {
  |       try {
  |         jbpmContext.close();
  |       } catch (RuntimeException e) {
  |         log.error("problem committing job execution transaction", e);
  |         throw e;
  |       }
  |     }
  |   }



View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4084231#4084231

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4084231
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to