Author: bfoster
Date: Wed Mar 23 00:12:30 2011
New Revision: 1084421

URL: http://svn.apache.org/viewvc?rev=1084421&view=rev
Log:

- improved synchronization of QueueManager

-------------------

Modified:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1084421&r1=1084420&r2=1084421&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 Wed Mar 23 00:12:30 2011
@@ -197,14 +197,19 @@ public class QueueManager {
                }
                if (stub != null) {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(stub.getInstanceId());
-                       cachedWP.uncache();
-                       processorLock.lock(cachedWP.getInstanceId());
-                       TaskProcessor taskProcessor = (TaskProcessor) 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
-                       TaskInstance taskInstance = 
this.makeInstance(taskProcessor);
-                       this.executingTasks.put(taskProcessor.getInstanceId() + 
":" + taskProcessor.getModelId(), taskProcessor.getStub());
-                       processorLock.unlock(cachedWP.getInstanceId());
-                       cachedWP.cache();
-                       return taskInstance;
+                       try {
+                               cachedWP.uncache();
+                               processorLock.lock(cachedWP.getInstanceId());
+                               TaskProcessor taskProcessor = (TaskProcessor) 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
+                               TaskInstance taskInstance = 
this.makeInstance(taskProcessor);
+                               
this.executingTasks.put(taskProcessor.getInstanceId() + ":" + 
taskProcessor.getModelId(), taskProcessor.getStub());
+                               return taskInstance;
+                       }catch (Exception e) {
+                               throw e;
+                       }finally {
+                               processorLock.unlock(cachedWP.getInstanceId());
+                               cachedWP.cache();
+                       }
                }else { 
                        return null;
                }
@@ -226,15 +231,20 @@ public class QueueManager {
                try {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                        if (cachedWP != null) {
-                               cachedWP.uncache();
-                               processorLock.lock(cachedWP.getInstanceId());
-                               if (modelId != null)
-                                       
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), 
modelId).revertState();
-                               else
-                                       
cachedWP.getWorkflowProcessor().revertState();
-                               
WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
-                               processorLock.unlock(cachedWP.getInstanceId());
-                               cachedWP.cache();
+                               try {
+                                       cachedWP.uncache();
+                                       
processorLock.lock(cachedWP.getInstanceId());
+                                       if (modelId != null)
+                                               
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), 
modelId).revertState();
+                                       else
+                                               
cachedWP.getWorkflowProcessor().revertState();
+                                       
WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
+                               }catch (Exception e) {
+                                       throw e;
+                               }finally {
+                                       
processorLock.unlock(cachedWP.getInstanceId());
+                                       cachedWP.cache();
+                               }
                        }
                }catch (Exception e) {
                        LOG.log(Level.SEVERE, "Failed to revert state for 
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
@@ -245,13 +255,18 @@ public class QueueManager {
                try {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                        if (cachedWP != null) {
-                               cachedWP.uncache();
-                               processorLock.lock(cachedWP.getInstanceId());
-                               WorkflowProcessor wp = (modelId == null) ? 
cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-                               if (wp instanceof TaskProcessor)
-                                       ((TaskProcessor) wp).setJobId(jobId);
-                               processorLock.unlock(cachedWP.getInstanceId());
-                               cachedWP.cache();
+                               try {
+                                       cachedWP.uncache();
+                                       
processorLock.lock(cachedWP.getInstanceId());
+                                       WorkflowProcessor wp = (modelId == 
null) ? cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+                                       if (wp instanceof TaskProcessor)
+                                               ((TaskProcessor) 
wp).setJobId(jobId);
+                               }catch (Exception e) {
+                                       throw e;
+                               }finally {
+                                       
processorLock.unlock(cachedWP.getInstanceId());
+                                       cachedWP.cache();
+                               }
                        }
                }catch (Exception e) {
                        LOG.log(Level.SEVERE, "Failed to set state for workflow 
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
@@ -262,24 +277,29 @@ public class QueueManager {
                try {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                        if (cachedWP != null) {
-                               cachedWP.uncache();
-                               processorLock.lock(cachedWP.getInstanceId());
-                               WorkflowProcessor wp = (modelId == null) ? 
cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-                               if (state instanceof RevertableWorkflowState)
-                                       ((RevertableWorkflowState) 
state).setPrevState(wp.getState());
-                               wp.setState(state);
-                               if (wp instanceof TaskProcessor) {
-                                       if 
(this.executingTasks.containsKey(instanceId + ":" + modelId)) {
-                                               if (!(state instanceof 
ExecutingState))
-                                                       
this.executingTasks.remove(instanceId + ":" + modelId);
-                                               else
-                                                       
this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
-                                       }else {
-                                               this.updateRunnableStub(wp);
+                               try {
+                                       cachedWP.uncache();
+                                       
processorLock.lock(cachedWP.getInstanceId());
+                                       WorkflowProcessor wp = (modelId == 
null) ? cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+                                       if (state instanceof 
RevertableWorkflowState)
+                                               ((RevertableWorkflowState) 
state).setPrevState(wp.getState());
+                                       wp.setState(state);
+                                       if (wp instanceof TaskProcessor) {
+                                               if 
(this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+                                                       if (!(state instanceof 
ExecutingState))
+                                                               
this.executingTasks.remove(instanceId + ":" + modelId);
+                                                       else
+                                                               
this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
+                                               }else {
+                                                       
this.updateRunnableStub(wp);
+                                               }
                                        }
+                               }catch (Exception e) {
+                                       throw e;
+                               }finally {
+                                       
processorLock.unlock(cachedWP.getInstanceId());
+                                       cachedWP.cache();
                                }
-                               processorLock.unlock(cachedWP.getInstanceId());
-                               cachedWP.cache();
                        }
                }catch (Exception e) {
                        LOG.log(Level.SEVERE, "Failed to set state for workflow 
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
@@ -294,14 +314,19 @@ public class QueueManager {
                        }
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                        if (cachedWP != null) {
-                               cachedWP.uncache();
-                               processorLock.lock(cachedWP.getInstanceId());
-                               WorkflowProcessor wp = (modelId == null) ? 
cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-                               wp.setPriorityRecur(priority);
-                               if (wp instanceof TaskProcessor) 
-                                       this.updateRunnableStub(wp);
-                               processorLock.unlock(cachedWP.getInstanceId());
-                               cachedWP.cache();
+                               try {
+                                       cachedWP.uncache();
+                                       
processorLock.lock(cachedWP.getInstanceId());
+                                       WorkflowProcessor wp = (modelId == 
null) ? cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+                                       wp.setPriorityRecur(priority);
+                                       if (wp instanceof TaskProcessor) 
+                                               this.updateRunnableStub(wp);
+                               }catch (Exception e) {
+                                       throw e;
+                               }finally {
+                                       
processorLock.unlock(cachedWP.getInstanceId());
+                                       cachedWP.cache();
+                               }
                        }
                }catch (Exception e) {
                        LOG.log(Level.SEVERE, "Failed to set priority for 
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
@@ -312,12 +337,17 @@ public class QueueManager {
                try {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                        if (cachedWP != null) {
-                               cachedWP.uncache();
-                               processorLock.lock(cachedWP.getInstanceId());
-                               WorkflowProcessor wp = modelId == null ? 
cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-                               wp.setDynamicMetadata(metadata);
-                               processorLock.unlock(cachedWP.getInstanceId());
-                               cachedWP.cache();
+                               try {
+                                       cachedWP.uncache();
+                                       
processorLock.lock(cachedWP.getInstanceId());
+                                       WorkflowProcessor wp = modelId == null 
? cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+                                       wp.setDynamicMetadata(metadata);
+                               }catch (Exception e) {
+                                       throw e;
+                               }finally {
+                                       
processorLock.unlock(cachedWP.getInstanceId());
+                                       cachedWP.cache();
+                               }
                        }
                }catch (Exception e) {
                        LOG.log(Level.SEVERE, "Failed to set metadata for 
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
@@ -328,11 +358,16 @@ public class QueueManager {
                CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
                WorkflowProcessor returnProcessor = null;
                if (cachedWP != null) {
-                       cachedWP.uncache();
-                       processorLock.lock(instanceId);
-                       returnProcessor = cachedWP.getWorkflowProcessor();
-                       processorLock.unlock(instanceId);
-                       cachedWP.cache();
+                       try {
+                               cachedWP.uncache();
+                               processorLock.lock(instanceId);
+                               returnProcessor = 
cachedWP.getWorkflowProcessor();
+                       }catch (RuntimeException e) {
+                               throw e;
+                       }finally {
+                               processorLock.unlock(instanceId);
+                               cachedWP.cache();
+                       }
                }               
                return returnProcessor;
        }
@@ -566,6 +601,10 @@ public class QueueManager {
                        try {
                                if (QueueManager.this.processorRepo != null)
                                        
QueueManager.this.processorRepo.delete(this.instanceId);
+                               this.wp = null;
+                               this.processorStub = null;
+                               this.cachedMetadata = null;
+                               this.instanceId = null;
                        }catch (Exception e) {
                                LOG.log(Level.WARNING, "Failed to delete " + 
this.instanceId + " : " + e.getMessage(), e);
                        }


Reply via email to