Author: bfoster
Date: Wed Mar 23 00:12:30 2011
New Revision: 1084421
URL: http://svn.apache.org/viewvc?rev=1084421&view=rev
Log:
- improved synchronization of QueueManager
-------------------
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1084421&r1=1084420&r2=1084421&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Wed Mar 23 00:12:30 2011
@@ -197,14 +197,19 @@ public class QueueManager {
}
if (stub != null) {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(stub.getInstanceId());
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- TaskProcessor taskProcessor = (TaskProcessor)
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
- TaskInstance taskInstance =
this.makeInstance(taskProcessor);
- this.executingTasks.put(taskProcessor.getInstanceId() +
":" + taskProcessor.getModelId(), taskProcessor.getStub());
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
- return taskInstance;
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ TaskProcessor taskProcessor = (TaskProcessor)
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
+ TaskInstance taskInstance =
this.makeInstance(taskProcessor);
+
this.executingTasks.put(taskProcessor.getInstanceId() + ":" +
taskProcessor.getModelId(), taskProcessor.getStub());
+ return taskInstance;
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}else {
return null;
}
@@ -226,15 +231,20 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- if (modelId != null)
-
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(),
modelId).revertState();
- else
-
cachedWP.getWorkflowProcessor().revertState();
-
WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+
processorLock.lock(cachedWP.getInstanceId());
+ if (modelId != null)
+
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(),
modelId).revertState();
+ else
+
cachedWP.getWorkflowProcessor().revertState();
+
WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
+ }catch (Exception e) {
+ throw e;
+ }finally {
+
processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to revert state for
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
@@ -245,13 +255,18 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ?
cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- if (wp instanceof TaskProcessor)
- ((TaskProcessor) wp).setJobId(jobId);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+
processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId ==
null) ? cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ if (wp instanceof TaskProcessor)
+ ((TaskProcessor)
wp).setJobId(jobId);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+
processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set state for workflow
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
@@ -262,24 +277,29 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ?
cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- if (state instanceof RevertableWorkflowState)
- ((RevertableWorkflowState)
state).setPrevState(wp.getState());
- wp.setState(state);
- if (wp instanceof TaskProcessor) {
- if
(this.executingTasks.containsKey(instanceId + ":" + modelId)) {
- if (!(state instanceof
ExecutingState))
-
this.executingTasks.remove(instanceId + ":" + modelId);
- else
-
this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
- }else {
- this.updateRunnableStub(wp);
+ try {
+ cachedWP.uncache();
+
processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId ==
null) ? cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ if (state instanceof
RevertableWorkflowState)
+ ((RevertableWorkflowState)
state).setPrevState(wp.getState());
+ wp.setState(state);
+ if (wp instanceof TaskProcessor) {
+ if
(this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+ if (!(state instanceof
ExecutingState))
+
this.executingTasks.remove(instanceId + ":" + modelId);
+ else
+
this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
+ }else {
+
this.updateRunnableStub(wp);
+ }
}
+ }catch (Exception e) {
+ throw e;
+ }finally {
+
processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
}
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set state for workflow
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
@@ -294,14 +314,19 @@ public class QueueManager {
}
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ?
cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- wp.setPriorityRecur(priority);
- if (wp instanceof TaskProcessor)
- this.updateRunnableStub(wp);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+
processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId ==
null) ? cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ wp.setPriorityRecur(priority);
+ if (wp instanceof TaskProcessor)
+ this.updateRunnableStub(wp);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+
processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set priority for
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
@@ -312,12 +337,17 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = modelId == null ?
cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- wp.setDynamicMetadata(metadata);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+
processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = modelId == null
? cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ wp.setDynamicMetadata(metadata);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+
processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set metadata for
workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
@@ -328,11 +358,16 @@ public class QueueManager {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
WorkflowProcessor returnProcessor = null;
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(instanceId);
- returnProcessor = cachedWP.getWorkflowProcessor();
- processorLock.unlock(instanceId);
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(instanceId);
+ returnProcessor =
cachedWP.getWorkflowProcessor();
+ }catch (RuntimeException e) {
+ throw e;
+ }finally {
+ processorLock.unlock(instanceId);
+ cachedWP.cache();
+ }
}
return returnProcessor;
}
@@ -566,6 +601,10 @@ public class QueueManager {
try {
if (QueueManager.this.processorRepo != null)
QueueManager.this.processorRepo.delete(this.instanceId);
+ this.wp = null;
+ this.processorStub = null;
+ this.cachedMetadata = null;
+ this.instanceId = null;
}catch (Exception e) {
LOG.log(Level.WARNING, "Failed to delete " +
this.instanceId + " : " + e.getMessage(), e);
}