Author: mattmann
Date: Thu Sep 20 06:35:44 2012
New Revision: 1387871

URL: http://svn.apache.org/viewvc?rev=1387871&view=rev
Log:
- fix for OODT-496: Convert EngineRunner interface to take TaskProcessor

Added:
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
Modified:
    oodt/trunk/CHANGES.txt
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
    
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java

Modified: oodt/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Thu Sep 20 06:35:44 2012
@@ -4,6 +4,8 @@ Apache OODT Change Log
 Release 0.5
 --------------------------------------------
 
+* OODT-496: Convert EngineRunner interface to take TaskProcessor (mattmann)
+
 * OODT-505: Remove synchronous Runner (mattmann)
 
 * OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus 

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
 Thu Sep 20 06:35:44 2012
@@ -115,7 +115,7 @@ public class TaskQuerier implements Runn
             && !processor.isAnyState("Executing")
             && processor.getRunnableWorkflowProcessors().size() > 0) {
           for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
-            WorkflowState state = lifecycle.createState("Executing", "running",
+            WorkflowState state = lifecycle.createState("WaitingOnResources", 
"waiting",
                 "Added to Runnable queue");
             tp.getWorkflowInstance().setState(state);
             persist(tp.getWorkflowInstance());

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
 Thu Sep 20 06:35:44 2012
@@ -78,17 +78,14 @@ public class TaskRunner implements Runna
    */
   @Override
   public void run() {
-    WorkflowTask nextTask = null;
     TaskProcessor nextTaskProcessor = null;
 
     while (running) {
       nextTaskProcessor = taskQuerier.getNext();
-      nextTask = nextTaskProcessor != null ? 
extractTaskFromProcessor(nextTaskProcessor)
-          : null;
 
       try {
-        if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
-          runner.execute(nextTask, 
nextTaskProcessor.getWorkflowInstance().getSharedContext());
+        if (nextTaskProcessor != null && 
runner.hasOpenSlots(nextTaskProcessor)) {
+          runner.execute(nextTaskProcessor);
         }
       } catch (Exception e) {
         e.printStackTrace();
@@ -96,9 +93,8 @@ public class TaskRunner implements Runna
             Level.SEVERE,
             "Engine failed while submitting jobs to its runner : "
                 + e.getMessage(), e);
-        if (nextTask != null) {
+        if (nextTaskProcessor != null) {
           this.flagProcessorAsFailed(nextTaskProcessor, e.getMessage());
-          nextTask = null;
           nextTaskProcessor = null;
         }
       }
@@ -140,6 +136,7 @@ public class TaskRunner implements Runna
         .getDefaultLifecycle()
         .createState("Failure", "done",
             "Failed while submitting job to Runner : " + msg));
+    //TODO: persist me?
 
   }
 

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
 Thu Sep 20 06:35:44 2012
@@ -281,9 +281,7 @@ public abstract class WorkflowProcessor 
                   + "running preconditiosn for workflow instance: ["
                   + this.workflowInstance.getId() + "]");
         } else {
-          if (this.getRunnableWorkflowProcessors() != null
-              && this.getRunnableWorkflowProcessors().size() == 0
-              && this.passedPostConditions()) {
+          if (this.isDone().getName().equals("ResultsSuccess")) {
             nextState = this.helper.getLifecycleForProcessor(this).createState(
                 "Success",
                 "done",
@@ -293,11 +291,13 @@ public abstract class WorkflowProcessor 
           }
         }
       } else if (currState.getName().equals("Executing")) {
+        if(this.isDone().getName().equals("ResultsSuccess")){
         nextState = this.helper.getLifecycleForProcessor(this).createState(
             "Success",
             "done",
             "Workflow Processor: nextState: " + "workflow instance: ["
                 + this.workflowInstance.getId() + "] completed successfully");
+        }
       }
 
       if (nextState != null) {

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
 Thu Sep 20 06:35:44 2012
@@ -36,6 +36,7 @@ import org.apache.oodt.cas.workflow.repo
 import org.apache.oodt.cas.workflow.structs.Graph;
 import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
 import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
@@ -56,11 +57,11 @@ public class WorkflowProcessorQueue {
       .getLogger(WorkflowProcessorQueue.class.getName());
 
   private WorkflowInstanceRepository repo;
-  
+
   private WorkflowRepository modelRepo;
 
   private WorkflowLifecycleManager lifecycle;
-  
+
   private Map<String, WorkflowProcessor> processorCache;
 
   public WorkflowProcessorQueue(WorkflowInstanceRepository repo,
@@ -88,11 +89,11 @@ public class WorkflowProcessorQueue {
     }
 
     List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>(
-        page.getPageWorkflows() != null ? page.getPageWorkflows().size():0);
+        page.getPageWorkflows() != null ? page.getPageWorkflows().size() : 0);
     for (WorkflowInstance inst : (List<WorkflowInstance>) (List<?>) page
         .getPageWorkflows()) {
-      if(!inst.getState().getCategory().getName().equals("done")){         
-         processors.add(fromWorkflowInstance(inst));
+      if (!inst.getState().getCategory().getName().equals("done")) {
+        processors.add(fromWorkflowInstance(inst));
       }
     }
 
@@ -101,82 +102,90 @@ public class WorkflowProcessorQueue {
 
   private WorkflowProcessor fromWorkflowInstance(WorkflowInstance inst) {
     WorkflowProcessor processor = null;
-    if(processorCache.containsKey(inst.getId())){
+    if (processorCache.containsKey(inst.getId())) {
       return processorCache.get(inst.getId());
-    }
-    else{
+    } else {
       if (inst.getParentChildWorkflow().getTasks() != null
           && inst.getParentChildWorkflow().getTasks().size() > 1) {
-        processor = new SequentialProcessor(lifecycle, inst);
-        WorkflowState seqProcessorState =  
-          getLifecycle(inst.getParentChildWorkflow()).
-             createState("Loaded", "initial", "Sequential Workflow instance 
with id: ["
-                 + inst.getId()+"] loaded by processor queue.");
+        processor = getProcessorFromInstanceGraph(inst, lifecycle);
+        WorkflowState seqProcessorState = getLifecycle(
+            inst.getParentChildWorkflow()).createState(
+            "Loaded",
+            "initial",
+            "Sequential Workflow instance with id: [" + inst.getId()
+                + "] loaded by processor queue.");
         inst.setState(seqProcessorState);
-        persist(inst);        
-        
+        persist(inst);
+
+        for (WorkflowCondition cond : inst.getParentChildWorkflow()
+            .getPreConditions()) {
+
+        }
+
         for (WorkflowTask task : inst.getParentChildWorkflow().getTasks()) {
           WorkflowInstance instance = new WorkflowInstance();
-          WorkflowState taskWorkflowState = 
-             lifecycle.getDefaultLifecycle().createState("Null", "initial",
-                 "Sub Task Workflow created by Workflow Processor Queue for 
workflow instance: " +
-                 "["+inst.getId()+"]");
+          WorkflowState taskWorkflowState = lifecycle.getDefaultLifecycle()
+              .createState(
+                  "Null",
+                  "initial",
+                  "Sub Task Workflow created by Workflow Processor Queue for 
workflow instance: "
+                      + "[" + inst.getId() + "]");
           instance.setState(taskWorkflowState);
           instance.setPriority(inst.getPriority());
           instance.setCurrentTaskId(task.getTaskId());
           ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
           String taskWorkflowId = UUID.randomUUID().toString();
-          workflow.setId("task-workflow-"+ taskWorkflowId);
-          workflow.setName("Task Workflow-"+task.getTaskName());
+          workflow.setId("task-workflow-" + taskWorkflowId);
+          workflow.setName("Task Workflow-" + task.getTaskName());
           workflow.getTasks().add(task);
           workflow.getGraph().setTask(task);
           instance.setId(taskWorkflowId);
           instance.setParentChildWorkflow(workflow);
-          this.addToModelRepo(workflow);     
+          this.addToModelRepo(workflow);
           persist(inst);
           WorkflowProcessor subProcessor = fromWorkflowInstance(instance);
-          processor.getSubProcessors().add(subProcessor);        
-        }              
-      }
-      else{
+          processor.getSubProcessors().add(subProcessor);
+        }
+      } else {
         processor = new TaskProcessor(lifecycle, inst);
-        WorkflowState taskProcessorState =  
-          getLifecycle(inst.getParentChildWorkflow()).
-             createState("Loaded", "initial", "Task Workflow instance with id: 
["
-                 + inst.getId()+"] loaded by processor queue.");
-        inst.setState(taskProcessorState);   
+        WorkflowState taskProcessorState = getLifecycle(
+            inst.getParentChildWorkflow()).createState(
+            "Loaded",
+            "initial",
+            "Task Workflow instance with id: [" + inst.getId()
+                + "] loaded by processor queue.");
+        inst.setState(taskProcessorState);
         persist(inst);
       }
-      
-      synchronized(processorCache){
+
+      synchronized (processorCache) {
         processorCache.put(inst.getId(), processor);
       }
-      return processor;      
+      return processor;
     }
 
   }
-  
-  private void addToModelRepo(Workflow workflow){
-    if(modelRepo != null){
+
+  private void addToModelRepo(Workflow workflow) {
+    if (modelRepo != null) {
       try {
         modelRepo.addWorkflow(workflow);
       } catch (RepositoryException e) {
         e.printStackTrace();
       }
-    }    
+    }
   }
-  
-  private void persist(WorkflowInstance instance){
+
+  private void persist(WorkflowInstance instance) {
     try {
       this.repo.updateWorkflowInstance(instance);
     } catch (Exception e) {
       e.printStackTrace();
-      LOG.log(
-          Level.WARNING,
+      LOG.log(Level.WARNING,
           "Unable to update workflow instance: [" + instance.getId()
-              + "] with status: [" + instance.getState().getName() + "]: 
Message: "
-              + e.getMessage());
-    }    
+              + "] with status: [" + instance.getState().getName()
+              + "]: Message: " + e.getMessage());
+    }
   }
 
   private WorkflowLifecycle getLifecycle(Workflow workflow) {
@@ -184,4 +193,15 @@ public class WorkflowProcessorQueue {
         .getLifecycleForWorkflow(workflow) : lifecycle.getDefaultLifecycle();
   }
 
+  private WorkflowProcessor getProcessorFromInstanceGraph(
+      WorkflowInstance instance, WorkflowLifecycleManager lifecycle) {
+    Graph graph = instance.getParentChildWorkflow().getGraph();
+    if (graph != null && graph.getExecutionType() != null
+        && graph.getExecutionType().equals("sequential")) {
+      return new SequentialProcessor(lifecycle, instance);
+    } else {
+      return new ParallelProcessor(lifecycle, instance);
+    }
+  }
+
 }

Added: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java?rev=1387871&view=auto
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
 (added)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
 Thu Sep 20 06:35:44 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ * 
+ * An abstract base class providing helper functionality to persist
+ * {@link WorkflowInstance}s, to get {@link WorkflowLifecycle}s from underlying
+ * {@link TaskProcessor}s, and to get {@link WorkflowTask}s from the underlying
+ * {@link TaskProcessor}.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public abstract class AbstractEngineRunnerBase extends EngineRunner {
+
+  protected final WorkflowInstanceRepository instRep;
+
+  private static final Logger LOG = Logger
+      .getLogger(AbstractEngineRunnerBase.class.getName());
+
+  /**
+   * Creates a new AbsractEngineRunnerBase with the provided
+   * {@link WorkflowInstanceRepository}.
+   * 
+   * @param instRep
+   *          The {@link WorkflowInstanceRepository} to use to persist
+   *          {@link TaskProcessor} {@link WorkflowInstance} information.
+   */
+  public AbstractEngineRunnerBase(WorkflowInstanceRepository instRep) {
+    this.instRep = instRep;
+  }
+
+  protected WorkflowTask getTaskFromProcessor(TaskProcessor taskProcessor) {
+    if (taskProcessor.getWorkflowInstance() != null
+        && taskProcessor.getWorkflowInstance().getParentChildWorkflow() != null
+        && taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+            .getGraph() != null) {
+      if (taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+          .getGraph().getTask() != null) {
+        return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+            .getGraph().getTask();
+      } else
+        return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+            .getTasks().get(0);
+    } else
+      return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+          .getTasks().get(0);
+  }
+
+  protected WorkflowLifecycle getLifecycle(TaskProcessor taskProcessor) {
+    return taskProcessor.getLifecycleManager().getDefaultLifecycle();
+  }
+
+  protected void persist(WorkflowInstance instance) {
+    try {
+      instRep.updateWorkflowInstance(instance);
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.log(Level.WARNING, "Unabled to persist workflow instance: ["
+          + instance.getId() + "]: Message: " + e.getMessage());
+    }
+  }
+
+}

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
 Thu Sep 20 06:35:44 2012
@@ -27,18 +27,21 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 //OODT imports
-import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
 import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
 
 /**
- * Runs a local version of a {@link WorkflowTask} asynchronously.
- *
+ * Runs a local version of a {@link TaskProcessor} asynchronously.
+ * 
  * @author mattmann (Chris Mattmann)
  * @author bfoster (Brian Foster)
  */
-public class AsynchronousLocalEngineRunner extends EngineRunner {
+public class AsynchronousLocalEngineRunner extends AbstractEngineRunnerBase {
 
   private static final Logger LOG = Logger
       .getLogger(AsynchronousLocalEngineRunner.class.getName());
@@ -49,44 +52,58 @@ public class AsynchronousLocalEngineRunn
   private final Map<String, Thread> workerMap;
 
   public AsynchronousLocalEngineRunner() {
-     this(DEFAULT_NUM_THREADS);
+    this(DEFAULT_NUM_THREADS, null);
   }
 
-  public AsynchronousLocalEngineRunner(int numThreads) {
-    this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
+  public AsynchronousLocalEngineRunner(int numThreads,
+      WorkflowInstanceRepository instRep) {
+    super(instRep);
+    this.executor = Executors.newFixedThreadPool(numThreads);
     this.workerMap = new HashMap<String, Thread>();
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
-   * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
-   * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+   * org.apache.oodt.cas.workflow.engine.runner.EngineRunner#execute(org.apache
+   * .oodt.cas.workflow.engine.processor.TaskProcessor)
    */
   @Override
-  public void execute(final WorkflowTask workflowTask,
-      final Metadata dynMetadata) throws Exception {
+  public void execute(final TaskProcessor taskProcessor) throws Exception {
     Thread worker = new Thread() {
 
       @Override
       public void run() {
+        WorkflowLifecycle lifecycle = getLifecycle(taskProcessor);
+        WorkflowTask workflowTask = getTaskFromProcessor(taskProcessor);
         WorkflowTaskInstance inst = GenericWorkflowObjectFactory
             
.getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
         try {
-          inst.run(dynMetadata, workflowTask.getTaskConfig());
+          inst.run(taskProcessor.getWorkflowInstance().getSharedContext(),
+              workflowTask.getTaskConfig());
+          String msg = "Task: [" + workflowTask.getTaskName()
+              + "] for instance id: ["
+              + taskProcessor.getWorkflowInstance().getId()
+              + "] completed successfully";
+          WorkflowState state = lifecycle.createState("ExecutionComplete", 
"transition", msg);
+          taskProcessor.getWorkflowInstance().setState(state);
+          persist(taskProcessor.getWorkflowInstance());
         } catch (Exception e) {
           e.printStackTrace();
-          LOG.log(Level.WARNING,
-              "Exception executing task: [" + workflowTask.getTaskName()
-                  + "]: Message: " + e.getMessage());
+          String msg = "Exception executing task: ["
+              + workflowTask.getTaskName() + "]: Message: " + e.getMessage();
+          LOG.log(Level.WARNING, msg);
+          WorkflowState state = lifecycle.createState("Failure", "done", msg);
+          taskProcessor.getWorkflowInstance().setState(state);
+          persist(taskProcessor.getWorkflowInstance());
         }
 
       }
 
       /*
        * (non-Javadoc)
-       *
+       * 
        * @see java.lang.Thread#interrupt()
        */
       @SuppressWarnings("deprecation")
@@ -108,7 +125,7 @@ public class AsynchronousLocalEngineRunn
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
    */
   @Override
@@ -122,11 +139,15 @@ public class AsynchronousLocalEngineRunn
 
   }
 
-  /* (non-Javadoc)
-   * @see 
org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.runner.EngineRunner#hasOpenSlots(org
+   * .apache.oodt.cas.workflow.engine.processor.TaskProcessor)
    */
   @Override
-  public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+  public boolean hasOpenSlots(TaskProcessor taskProcessor) throws Exception {
     // TODO Auto-generated method stub
     return true;
   }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
 Thu Sep 20 06:35:44 2012
@@ -16,29 +16,44 @@
  */
 package org.apache.oodt.cas.workflow.engine.runner;
 
+//OODT imports
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
 /**
- * A {@link EngineRunnerFactory} which creates {@link 
AsynchronousLocalEngineRunner}s.
- *
+ * A {@link EngineRunnerFactory} which creates
+ * {@link AsynchronousLocalEngineRunner}s.
+ * 
  * @author bfoster (Brian Foster)
+ * @author mattmann (Chris Mattmann)
  */
 public class AsynchronousLocalEngineRunnerFactory implements
-      EngineRunnerFactory {
-
-   private static final String NUM_THREADS_PROPERTY = 
"org.apache.oodt.cas.workflow.wengine.asynchronous.runner.num.threads";
+    EngineRunnerFactory {
 
-   private int numThreads;
+  private static final String NUM_THREADS_PROPERTY = 
"org.apache.oodt.cas.workflow.wengine.asynchronous.runner.num.threads";
 
-   public AsynchronousLocalEngineRunnerFactory() {
-      numThreads = Integer.getInteger(NUM_THREADS_PROPERTY,
-            AsynchronousLocalEngineRunner.DEFAULT_NUM_THREADS);
-   }
+  private static final String INSTANCE_REPO_FACTORY_PROPERTY = 
"workflow.engine.instanceRep.factory";
 
-   @Override
-   public AsynchronousLocalEngineRunner createEngineRunner() {
-      return new AsynchronousLocalEngineRunner(numThreads);
-   }
+  private int numThreads;
 
-   public void setNumThreads(int numThreads) {
-      this.numThreads = numThreads;
-   }
+  public AsynchronousLocalEngineRunnerFactory() {
+    numThreads = Integer.getInteger(NUM_THREADS_PROPERTY,
+        AsynchronousLocalEngineRunner.DEFAULT_NUM_THREADS);
+  }
+
+  @Override
+  public AsynchronousLocalEngineRunner createEngineRunner() {
+    return new AsynchronousLocalEngineRunner(numThreads,
+        getWorkflowInstanceRepository());
+  }
+
+  public void setNumThreads(int numThreads) {
+    this.numThreads = numThreads;
+  }
+
+  protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+    return GenericWorkflowObjectFactory
+        .getWorkflowInstanceRepositoryFromClassName(System
+            .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+  }
 }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
 Thu Sep 20 06:35:44 2012
@@ -18,6 +18,7 @@ package org.apache.oodt.cas.workflow.eng
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 
 /**
@@ -33,19 +34,17 @@ import org.apache.oodt.cas.workflow.stru
 public abstract class EngineRunner {
 
   /**
-   * Executes a {@link WorkflowTask} on an execution substrate. Ideally there
+   * Executes a {@link TaskProcessor} on an execution substrate. Ideally there
    * will only ever be two of these substrates, one for local execution, and
    * another for communication with the Resource Manager.
    *
-   * @param workflowTask
-   *          The model of the {@link WorkflowTask} to instantiate and execute.
-   * @param dynMetadata
-   *          The dynamic {@link Metadata} passed to this {@link WorkflowTask}.
+   * @param taskProcessor
+   *          The {@link TaskProcessor} to instantiate and execute.
    *
    * @throws Exception
    *           If any error occurs.
    */
-  public abstract void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+  public abstract void execute(TaskProcessor taskProcessor)
       throws Exception;
 
   /**
@@ -59,12 +58,12 @@ public abstract class EngineRunner {
   
   /**
    * Decides whether or not there are available slots within this runner
-   * to execute the provided {@link WorkflowTask}.
+   * to execute the provided {@link TaskProcessor}.
    * 
-   * @param workflowTask The {@link WorkflowTask} to execute.
+   * @param workflowTask The {@link TaskProcessor} to execute.
    * @return True if there is an open slot, false otherwise.
    * @throws Exception If any error occurs.
    */
-  public abstract boolean hasOpenSlots(WorkflowTask workflowTask) throws 
Exception;  
+  public abstract boolean hasOpenSlots(TaskProcessor taskProcessor) throws 
Exception;  
 
 }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 Thu Sep 20 06:35:44 2012
@@ -23,10 +23,11 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 //OODT imports
-import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.resource.structs.Job;
 import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
 import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
 import org.apache.oodt.cas.workflow.structs.TaskJobInput;
 import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
@@ -40,7 +41,7 @@ import org.apache.oodt.cas.workflow.stru
  * @version $Revision$
  * 
  */
-public class ResourceRunner extends EngineRunner implements CoreMetKeys,
+public class ResourceRunner extends AbstractEngineRunnerBase implements 
CoreMetKeys,
     WorkflowStatus {
 
   private static final Logger LOG = Logger.getLogger(ResourceRunner.class
@@ -52,21 +53,18 @@ public class ResourceRunner extends Engi
 
   private String currentJobId;
 
-  public ResourceRunner(URL resUrl) {
+  public ResourceRunner(URL resUrl, WorkflowInstanceRepository instRep) {
+    super(instRep);
     this.rClient = new XmlRpcResourceManagerClient(resUrl);
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
-   * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+  /* (non-Javadoc)
+   * @see 
org.apache.oodt.cas.workflow.engine.runner.EngineRunner#execute(org.apache.oodt.cas.workflow.engine.processor.TaskProcessor)
    */
   @Override
-  public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
-      throws Exception {
+  public void execute(TaskProcessor taskProcessor) throws Exception {
     Job workflowTaskJob = new Job();
+    WorkflowTask workflowTask = getTaskFromProcessor(taskProcessor);
     workflowTaskJob.setName(workflowTask.getTaskId());
     workflowTaskJob
         
.setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
@@ -83,7 +81,7 @@ public class ResourceRunner extends Engi
     }
 
     TaskJobInput in = new TaskJobInput();
-    in.setDynMetadata(dynMetadata);
+    in.setDynMetadata(taskProcessor.getWorkflowInstance().getSharedContext());
     in.setTaskConfig(workflowTask.getTaskConfig());
     
in.setWorkflowTaskInstanceClassName(workflowTask.getTaskInstanceClassName());
 
@@ -110,10 +108,10 @@ public class ResourceRunner extends Engi
   
 
   /* (non-Javadoc)
-   * @see 
org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+   * @see 
org.apache.oodt.cas.workflow.engine.runner.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.engine.processor.TaskProcessor)
    */
   @Override
-  public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+  public boolean hasOpenSlots(TaskProcessor taskProcessor) throws Exception {
     // TODO Auto-generated method stub
     return false;
   }
@@ -141,4 +139,5 @@ public class ResourceRunner extends Engi
       return false;
   }
 
+
 }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
 Thu Sep 20 06:35:44 2012
@@ -22,6 +22,9 @@ import java.net.URL;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
 //Google imports
 import com.google.common.base.Preconditions;
 
@@ -30,6 +33,7 @@ import com.google.common.base.Preconditi
  * Factory which creates {@link ResourceRunner}s.
  *
  * @author bfoster (Brian Foster)
+ * @author mattmann (Chris Mattmann)
  */
 public class ResourceRunnerFactory implements EngineRunnerFactory{
 
@@ -37,6 +41,8 @@ public class ResourceRunnerFactory imple
 
    private static final String RESOURCE_MANAGER_URL_PROPERTY = 
"org.apache.oodt.cas.workflow.engine.resourcemgr.url";
 
+   private static final String INSTANCE_REPO_FACTORY_PROPERTY = 
"workflow.engine.instanceRep.factory";
+   
    private String resUrl;
 
    public ResourceRunnerFactory() {
@@ -49,7 +55,7 @@ public class ResourceRunnerFactory imple
          Preconditions.checkNotNull(resUrl,
                "Must specify Resource Manager URL [property = "
                      + RESOURCE_MANAGER_URL_PROPERTY + "]");
-         return new ResourceRunner(new URL(resUrl));
+         return new ResourceRunner(new URL(resUrl), 
getWorkflowInstanceRepository());
       } catch (MalformedURLException e) {
          LOG.log(Level.SEVERE, "Failed to load ResourceRunner : " + 
e.getMessage(), e);
          return null;
@@ -59,4 +65,10 @@ public class ResourceRunnerFactory imple
    public void setResourceManagerUrl(String resUrl) {
       this.resUrl = resUrl;
    }
+   
+   protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+     return GenericWorkflowObjectFactory
+         .getWorkflowInstanceRepositoryFromClassName(System
+             .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+   }   
 }

Modified: 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
 Thu Sep 20 06:35:44 2012
@@ -29,8 +29,9 @@ import org.apache.commons.io.FileUtils;
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
 import 
org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunner;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.Priority;
 import org.apache.oodt.commons.date.DateUtils;
 import org.apache.oodt.commons.util.DateConvert;
 
@@ -54,17 +55,31 @@ public class TestAsynchronousLocalEngine
   private AsynchronousLocalEngineRunner runner;
 
   protected File testDir;
-  
+
   private QuerierAndRunnerUtils utils;
 
   public void testRun() {
-    WorkflowTask task = utils.getTask(testDir);
+    TaskProcessor taskProcessor1 = null;
+    TaskProcessor taskProcessor2 = null;
+
+    try {
+      taskProcessor1 = (TaskProcessor) utils.getProcessor(Priority.getDefault()
+          .getValue(), "Executing", "running");
+      taskProcessor2 = (TaskProcessor) utils.getProcessor(Priority.getDefault()
+          .getValue(), "Executing", "running");
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
     Metadata met = new Metadata();
-    met.addMetadata("StartDateTime", 
DateUtils.toString(Calendar.getInstance()));   
+    met.addMetadata("StartDateTime", 
DateUtils.toString(Calendar.getInstance()));
+
+    taskProcessor1.getWorkflowInstance().getSharedContext().addMetadata(met);
+    taskProcessor2.getWorkflowInstance().getSharedContext().addMetadata(met);
 
     try {
-      runner.execute(task, met);
-      runner.execute(task, met);
+      runner.execute(taskProcessor1);
+      runner.execute(taskProcessor2);
       assertTrue(ranFast());
     } catch (Exception e) {
       e.printStackTrace();
@@ -80,7 +95,8 @@ public class TestAsynchronousLocalEngine
       try {
         String line = FileUtils.readFileToString(f);
         String[] toks = line.split(",");
-        assertEquals("Toks not equal to 2: toks=["+Arrays.asList(toks)+"]", 2, 
toks.length);
+        assertEquals("Toks not equal to 2: toks=[" + Arrays.asList(toks) + "]",
+            2, toks.length);
         Date dateTime = DateConvert.isoParse(toks[1]);
         Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
             new DateTime());
@@ -114,8 +130,9 @@ public class TestAsynchronousLocalEngine
    */
   @Override
   protected void setUp() throws Exception {
-    String parentPath = File.createTempFile("test", 
"txt").getParentFile().getAbsolutePath();
-    parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+    String parentPath = File.createTempFile("test", "txt").getParentFile()
+        .getAbsolutePath();
+    parentPath = parentPath.endsWith("/") ? parentPath : parentPath + "/";
     String testJobDirPath = parentPath + "jobs";
     testDir = new File(testJobDirPath);
     testDir.mkdirs();


Reply via email to