Author: bfoster
Date: Mon Feb 28 23:41:08 2011
New Revision: 1075591
URL: http://svn.apache.org/viewvc?rev=1075591&view=rev
Log:
- allows for EngineRunners to override JobId
- fixed GetPage not sorting
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
Mon Feb 28 23:41:08 2011
@@ -131,7 +131,10 @@ public class WorkflowEngineLocal impleme
nextTask =
WorkflowEngineLocal.this.queueManager.getNext();
while (!pauseRunner &&
allowRunnerToWork && nextTask != null &&
WorkflowEngineLocal.this.runner.hasOpenSlots(nextTask)) {
nextTask.setNotifyEngine(WorkflowEngineLocal.this.weClient);
+ String jobId =
nextTask.getJobId();
WorkflowEngineLocal.this.runner.execute(nextTask);
+ if
(!jobId.equals(nextTask.getJobId()))
+
WorkflowEngineLocal.this.queueManager.setJobId(nextTask.getInstanceId(),
nextTask.getModelId(), nextTask.getJobId());
nextTask =
WorkflowEngineLocal.this.queueManager.getNext();
//take a breather
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Mon Feb 28 23:41:08 2011
@@ -217,12 +217,8 @@ public class QueueManager {
ti.setStaticMetadata(taskProcessor.getStaticMetadata());
ti.setModelId(taskProcessor.getModelId());
ti.setExecutionType(taskProcessor.getExecutionType());
- if (taskProcessor.getJobId() == null) {
- ti.setJobId(UUID.randomUUID().toString());
- taskProcessor.setJobId(ti.getJobId());
- }else {
- ti.setJobId(taskProcessor.getJobId());
- }
+ ti.setJobId(UUID.randomUUID().toString());
+ taskProcessor.setJobId(ti.getJobId());
return ti;
}
@@ -245,6 +241,23 @@ public class QueueManager {
}
}
+ public void setJobId(String instanceId, String modelId, String jobId) {
+ try {
+ CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
+ if (cachedWP != null) {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId == null) ?
cachedWP.getWorkflowProcessor() :
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ if (wp instanceof TaskProcessor)
+ ((TaskProcessor) wp).setJobId(jobId);
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to set state for workflow
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " +
e.getMessage(), e);
+ }
+ }
+
public void setState(String instanceId, String modelId, WorkflowState
state) {
try {
CachedWorkflowProcessor cachedWP =
this.processorQueue.get(instanceId);
@@ -395,7 +408,7 @@ public class QueueManager {
return new QueuePage(this.getProcessedPageInfo(pageInfo,
sortedCachedWPs.size()), this.getPage(pageInfo, sortedCachedWPs), comparator);
}
- public QueuePage getPage(PageInfo pageInfo, PageFilter filter,
Comparator<ProcessorStub> comparator) {
+ public QueuePage getPage(PageInfo pageInfo, PageFilter filter, final
Comparator<ProcessorStub> comparator) {
Vector<CachedWorkflowProcessor> acceptedWPs = new
Vector<CachedWorkflowProcessor>();
Vector<CachedWorkflowProcessor> cachedWPs = null;
synchronized(processorQueue) {
@@ -406,11 +419,10 @@ public class QueueManager {
if (filter.accept(cachedWP.getStub(),
cachedWP.getCachedMetadata()))
acceptedWPs.add(cachedWP);
if (comparator != null) {
- final Comparator<ProcessorStub> comparatorFinal =
comparator;
- Collections.sort(cachedWPs, new
Comparator<CachedWorkflowProcessor>() {
+ Collections.sort(acceptedWPs, new
Comparator<CachedWorkflowProcessor>() {
public int compare(CachedWorkflowProcessor o1,
CachedWorkflowProcessor o2) {
- return
comparatorFinal.compare(o1.getStub(), o2.getStub());
+ return comparator.compare(o1.getStub(),
o2.getStub());
}
});
}
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
Mon Feb 28 23:41:08 2011
@@ -46,8 +46,8 @@ public class LocalEngineRunner extends E
public LocalEngineRunner(int numOfSlots, int cacheSize) {
this.numOfSlots = numOfSlots;
this.cacheSize = cacheSize;
+ this.cache = new Vector<TaskInstance>();
if (this.cacheSize > 0) {
- cache = new Vector<TaskInstance>();
new Thread(new Runnable() {
public void run() {
while (running) {
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
Mon Feb 28 23:41:08 2011
@@ -45,8 +45,8 @@ public class ResourceRunner extends Engi
ResourceJobInput input = new ResourceJobInput();
input.workflowInstance = workflowInstance;
Job job = new Job();
- job.setId(workflowInstance.getJobId());
- job.setName(workflowInstance.getModelId());
+// job.setId(workflowInstance.getJobId());
+ job.setName(workflowInstance.getInstanceId() + ":" +
workflowInstance.getModelId());
job.setJobInputClassName(ResourceJobInput.class.getCanonicalName());
job.setJobInstanceClassName(ResourceJobInstance.class.getCanonicalName());
Metadata m = workflowInstance.getMetadata();
@@ -58,7 +58,7 @@ public class ResourceRunner extends Engi
job.setQueueName(m.getMetadata(QUEUE_NAME));
else
throw new Exception("Must specify 'QueueName' for task
[instanceId = '" + workflowInstance.getInstanceId() + "', modelId = '" +
workflowInstance.getModelId() + "']");
- this.rsManagerClient.submitJob(job, input);
+ workflowInstance.setJobId(this.rsManagerClient.submitJob(job,
input));
}
@Override