Author: bfoster
Date: Mon Feb 28 23:41:08 2011
New Revision: 1075591

URL: http://svn.apache.org/viewvc?rev=1075591&view=rev
Log:

- allows for EngineRunners to override JobId

- fixed GetPage not sorting

Modified:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
 Mon Feb 28 23:41:08 2011
@@ -131,7 +131,10 @@ public class WorkflowEngineLocal impleme
                                                        nextTask = 
WorkflowEngineLocal.this.queueManager.getNext();
                                                while (!pauseRunner && 
allowRunnerToWork && nextTask != null && 
WorkflowEngineLocal.this.runner.hasOpenSlots(nextTask)) {
                                                        
nextTask.setNotifyEngine(WorkflowEngineLocal.this.weClient);
+                                                       String jobId = 
nextTask.getJobId();
                                                        
WorkflowEngineLocal.this.runner.execute(nextTask);
+                                                       if 
(!jobId.equals(nextTask.getJobId()))
+                                                               
WorkflowEngineLocal.this.queueManager.setJobId(nextTask.getInstanceId(), 
nextTask.getModelId(), nextTask.getJobId());
                                                        nextTask = 
WorkflowEngineLocal.this.queueManager.getNext();
 
                                                        //take a breather

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 Mon Feb 28 23:41:08 2011
@@ -217,12 +217,8 @@ public class QueueManager {
                ti.setStaticMetadata(taskProcessor.getStaticMetadata());
                ti.setModelId(taskProcessor.getModelId());
                ti.setExecutionType(taskProcessor.getExecutionType());
-               if (taskProcessor.getJobId() == null) {
-                       ti.setJobId(UUID.randomUUID().toString());
-                       taskProcessor.setJobId(ti.getJobId());
-               }else {
-                       ti.setJobId(taskProcessor.getJobId());
-               }
+               ti.setJobId(UUID.randomUUID().toString());
+               taskProcessor.setJobId(ti.getJobId());
                return ti;
        }
        
@@ -245,6 +241,23 @@ public class QueueManager {
                }
        }
        
+       public void setJobId(String instanceId, String modelId, String jobId) {
+               try {
+                       CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
+                       if (cachedWP != null) {
+                               cachedWP.uncache();
+                               processorLock.lock(cachedWP.getInstanceId());
+                               WorkflowProcessor wp = (modelId == null) ? 
cachedWP.getWorkflowProcessor() : 
WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+                               if (wp instanceof TaskProcessor)
+                                       ((TaskProcessor) wp).setJobId(jobId);
+                               processorLock.unlock(cachedWP.getInstanceId());
+                               cachedWP.cache();
+                       }
+               }catch (Exception e) {
+                       LOG.log(Level.SEVERE, "Failed to set state for workflow 
[InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + 
e.getMessage(), e);
+               }
+       }
+       
        public void setState(String instanceId, String modelId, WorkflowState 
state) {
                try {
                        CachedWorkflowProcessor cachedWP = 
this.processorQueue.get(instanceId);
@@ -395,7 +408,7 @@ public class QueueManager {
                return new QueuePage(this.getProcessedPageInfo(pageInfo, 
sortedCachedWPs.size()), this.getPage(pageInfo, sortedCachedWPs), comparator);
     }
     
-    public QueuePage getPage(PageInfo pageInfo, PageFilter filter, 
Comparator<ProcessorStub> comparator) {
+    public QueuePage getPage(PageInfo pageInfo, PageFilter filter, final 
Comparator<ProcessorStub> comparator) {
        Vector<CachedWorkflowProcessor> acceptedWPs = new 
Vector<CachedWorkflowProcessor>();
        Vector<CachedWorkflowProcessor> cachedWPs = null;
        synchronized(processorQueue) {
@@ -406,11 +419,10 @@ public class QueueManager {
                                if (filter.accept(cachedWP.getStub(), 
cachedWP.getCachedMetadata()))
                                        acceptedWPs.add(cachedWP);
                if (comparator != null) {
-                       final Comparator<ProcessorStub> comparatorFinal = 
comparator;
-                       Collections.sort(cachedWPs, new 
Comparator<CachedWorkflowProcessor>() {
+                       Collections.sort(acceptedWPs, new 
Comparator<CachedWorkflowProcessor>() {
                                public int compare(CachedWorkflowProcessor o1,
                                                CachedWorkflowProcessor o2) {
-                                       return 
comparatorFinal.compare(o1.getStub(), o2.getStub());
+                                       return comparator.compare(o1.getStub(), 
o2.getStub());
                                }
                        });
                }

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 Mon Feb 28 23:41:08 2011
@@ -46,8 +46,8 @@ public class LocalEngineRunner extends E
        public LocalEngineRunner(int numOfSlots, int cacheSize) {
                this.numOfSlots = numOfSlots;
                this.cacheSize = cacheSize;
+               this.cache = new Vector<TaskInstance>();
                if (this.cacheSize > 0) {
-                       cache = new Vector<TaskInstance>();
                        new Thread(new Runnable() {
                                public void run() {
                                        while (running) {

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1075591&r1=1075590&r2=1075591&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 Mon Feb 28 23:41:08 2011
@@ -45,8 +45,8 @@ public class ResourceRunner extends Engi
                ResourceJobInput input = new ResourceJobInput();
                input.workflowInstance = workflowInstance;
                Job job = new Job();
-               job.setId(workflowInstance.getJobId());
-               job.setName(workflowInstance.getModelId());
+//             job.setId(workflowInstance.getJobId());
+               job.setName(workflowInstance.getInstanceId() + ":" + 
workflowInstance.getModelId());
                
job.setJobInputClassName(ResourceJobInput.class.getCanonicalName());
                
job.setJobInstanceClassName(ResourceJobInstance.class.getCanonicalName());
                Metadata m = workflowInstance.getMetadata();
@@ -58,7 +58,7 @@ public class ResourceRunner extends Engi
                        job.setQueueName(m.getMetadata(QUEUE_NAME));
                else
                        throw new Exception("Must specify 'QueueName' for task 
[instanceId = '" + workflowInstance.getInstanceId() + "', modelId = '" + 
workflowInstance.getModelId() + "']");
-               this.rsManagerClient.submitJob(job, input);
+               workflowInstance.setJobId(this.rsManagerClient.submitJob(job, 
input));
        }
 
        @Override


Reply via email to