Author: mattmann
Date: Sun Sep 16 21:40:52 2012
New Revision: 1385381

URL: http://svn.apache.org/viewvc?rev=1385381&view=rev
Log:
- fix for OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus 
patches

Added:
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
Modified:
    oodt/trunk/CHANGES.txt
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java

Modified: oodt/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Sun Sep 16 21:40:52 2012
@@ -4,6 +4,9 @@ Apache OODT Change Log
 Release 0.5
 --------------------------------------------
 
+* OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus 
+  patches (mattmann)
+
 * OODT-500: Rename property for max threads in AsyncLocalEngineRunner 
(mattmann)
 
 * OODT-497: Make WorkflowProcessor PrioritySorters thread-safe (mattmann)

Added: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java?rev=1385381&view=auto
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
 (added)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
 Sun Sep 16 21:40:52 2012
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
+import 
org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.commons.util.DateConvert;
+
+//JDK imports
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An instance of the {@link WorkflowProcessorThread} that processes through an
+ * iterative {@link WorkflowInstance}. This class keeps an 
<code>Iterator</code>
+ * that allows it to move from one end of a sequential {@link Workflow}
+ * processing pipeline to another. This class should only be used to process
+ * science pipeline style {@link Workflow}s, i.e., those which resemble an
+ * iterative processing pipelines, with no forks, or concurrent task 
executions.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+
+public class IterativeWorkflowProcessorThread implements WorkflowStatus,
+    CoreMetKeys, Runnable {
+
+  /* the default queue name if we're using resmgr job submission */
+  private static final String DEFAULT_QUEUE_NAME = "high";
+
+  /* an iterator representing the current task that we are on in the workflow 
*/
+  private Iterator taskIterator = null;
+
+  /* the workflow instance that this processor thread is processing */
+  private WorkflowInstance workflowInst = null;
+
+  /* should our workflow processor thread start running? */
+  private boolean running = false;
+
+  /*
+   * the amount of seconds to wait inbetween checking for task pre-condition
+   * satisfaction
+   */
+  private long waitForConditionSatisfy = -1;
+
+  /* our instance repository used to persist workflow instance info */
+  private WorkflowInstanceRepository instanceRepository = null;
+
+  /*
+   * our client to a resource manager: if null, local task execution will be
+   * performed
+   */
+  private XmlRpcResourceManagerClient rClient = null;
+
+  /* polling wait for res mgr */
+  private long pollingWaitTime = 10L;
+
+  /*
+   * should our workflow processor thread pause, and not move onto the next
+   * task?
+   */
+  private boolean pause = false;
+
+  /* our log stream */
+  private static Logger LOG = Logger
+      .getLogger(IterativeWorkflowProcessorThread.class.getName());
+
+  private Map CONDITION_CACHE = new HashMap();
+
+  /* the parent workflow manager url that executed this processor thread */
+  private URL wmgrParentUrl = null;
+
+  /* the currently executing jobId if we're using the resource manager */
+  private String currentJobId = null;
+
+  public IterativeWorkflowProcessorThread(WorkflowInstance wInst,
+      WorkflowInstanceRepository instRep, URL wParentUrl) {
+    workflowInst = wInst;
+    taskIterator = workflowInst.getWorkflow().getTasks().iterator();
+    this.instanceRepository = instRep;
+
+    /* start out the gates running */
+    running = true;
+
+    /*
+     * get the amount of wait time inbetween checking for task pre-condition
+     * satisfaction
+     */
+    waitForConditionSatisfy = Long.getLong(
+        "org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
+        .longValue();
+
+    pollingWaitTime = Long.getLong(
+        "org.apache.oodt.cas.workflow.engine.resourcemgr.pollingWaitTime", 10)
+        .longValue();
+
+    wmgrParentUrl = wParentUrl;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see java.lang.Runnable#run()
+   */
+  public void run() {
+    /*
+     * okay, we got into the run method, mark the start date time for the
+     * workflow instance here
+     */
+    String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
+    workflowInst.setStartDateTimeIsoStr(startDateTimeIsoStr);
+    // persist it
+    persistWorkflowInstance();
+
+    while (running && taskIterator.hasNext()) {
+      if (pause) {
+        LOG.log(Level.FINE,
+            "IterativeWorkflowProcessorThread: Skipping execution: Paused: 
CurrentTask: "
+                + getTaskNameById(workflowInst.getCurrentTaskId()));
+        continue;
+      }
+
+      WorkflowTask task = (WorkflowTask) taskIterator.next();
+      workflowInst.setCurrentTaskId(task.getTaskId());
+
+      // now persist it
+      persistWorkflowInstance();
+
+      // check to see if req met fields are present
+      // if they aren't, set the status to METERROR, and then fail
+      if (!checkTaskRequiredMetadata(task, 
this.workflowInst.getSharedContext())) {
+        this.workflowInst.setStatus(METADATA_MISSING);
+        persistWorkflowInstance();
+        // now break out of this run loop
+        return;
+      }
+
+      // this is where the pre-conditions come in
+      // only execute the below code when it's passed all of its
+      // pre-conditions
+      if (task.getConditions() != null) {
+        while (!satisfied(task.getConditions(), task.getTaskId())
+            && !isStopped()) {
+
+          // if we're not paused, go ahead and pause us now
+          if (!isPaused()) {
+            pause();
+          }
+
+          LOG.log(Level.FINEST,
+              "Pre-conditions for task: " + task.getTaskName()
+                  + " unsatisfied: waiting: " + waitForConditionSatisfy
+                  + " seconds before checking again.");
+          try {
+            Thread.currentThread().sleep(waitForConditionSatisfy * 1000);
+          } catch (InterruptedException ignore) {
+          }
+
+          // check to see if we've been resumed, if so, break
+          // the loop and start
+          if (!isPaused()) {
+            break;
+          }
+        }
+
+        // check to see if we've been killed
+        if (isStopped()) {
+          break;
+        }
+
+        // un pause us (if needed)
+        if (isPaused()) {
+          resume();
+        }
+      }
+
+      // task execution
+      LOG.log(
+          Level.FINEST,
+          "IterativeWorkflowProcessorThread: Executing task: "
+              + task.getTaskName());
+
+      WorkflowTaskInstance taskInstance = GenericWorkflowObjectFactory
+          .getTaskObjectFromClassName(task.getTaskInstanceClassName());
+      // add the TaskId and the JobId and ProcessingNode
+      // TODO: unfake the JobId
+      workflowInst.getSharedContext()
+          .replaceMetadata(TASK_ID, task.getTaskId());
+      workflowInst.getSharedContext().replaceMetadata(WORKFLOW_INST_ID,
+          workflowInst.getId());
+      workflowInst.getSharedContext().replaceMetadata(JOB_ID,
+          workflowInst.getId());
+      workflowInst.getSharedContext().replaceMetadata(PROCESSING_NODE,
+          getHostname());
+      workflowInst.getSharedContext().replaceMetadata(WORKFLOW_MANAGER_URL,
+          this.wmgrParentUrl.toString());
+
+      if (rClient != null) {
+        // build up the Job
+        // and the Job Input
+        Job taskJob = new Job();
+        taskJob.setName(task.getTaskId());
+        taskJob
+            
.setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
+        taskJob
+            
.setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
+        taskJob.setLoadValue(new Integer(2));
+        taskJob
+            .setQueueName(task.getTaskConfig().getProperty(QUEUE_NAME) != null 
? task
+                .getTaskConfig().getProperty(QUEUE_NAME) : DEFAULT_QUEUE_NAME);
+
+        TaskJobInput in = new TaskJobInput();
+        in.setDynMetadata(workflowInst.getSharedContext());
+        in.setTaskConfig(task.getTaskConfig());
+        in.setWorkflowTaskInstanceClassName(task.getTaskInstanceClassName());
+
+        workflowInst.setStatus(RESMGR_SUBMIT);
+        persistWorkflowInstance();
+
+        try {
+          // this is * NOT * a blocking operation so when it returns
+          // the job may not actually have finished executing
+          // so we go into a waiting/sleep behavior using the passed
+          // back job id to wait until the job has actually finished
+          // executing
+
+          this.currentJobId = rClient.submitJob(taskJob, in);
+
+          while (!safeCheckJobComplete(this.currentJobId) && !isStopped()) {
+            // sleep for 5 seconds then come back
+            // and check again
+            try {
+              Thread.currentThread().sleep(pollingWaitTime * 1000);
+            } catch (InterruptedException ignore) {
+            }
+          }
+
+          // okay job is done: TODO: fix this hack
+          // the task update time was set remotely
+          // by remote task, so let's read it now
+          // from the instRepo (which will have the updated
+          // time)
+
+          if (isStopped()) {
+            // this means that this workflow was killed, so
+            // gracefully exit
+            break;
+          }
+
+          WorkflowInstance updatedInst = null;
+          try {
+            updatedInst = instanceRepository
+                .getWorkflowInstanceById(workflowInst.getId());
+            workflowInst = updatedInst;
+          } catch (InstanceRepositoryException e) {
+            e.printStackTrace();
+            LOG.log(Level.WARNING, "Unable to get " + "updated workflow "
+                + "instance record " + "when executing remote job: Message: "
+                + e.getMessage());
+          }
+
+        } catch (JobExecutionException e) {
+          LOG.log(Level.WARNING,
+              "Job execution exception using resource manager to execute job: 
Message: "
+                  + e.getMessage());
+        }
+      } else {
+        // we started, so mark it
+        workflowInst.setStatus(STARTED);
+        // go ahead and persist the workflow instance, after we
+        // save the current task start date time
+        String currentTaskIsoStartDateTimeStr = DateConvert
+            .isoFormat(new Date());
+        workflowInst
+            .setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
+        workflowInst.setCurrentTaskEndDateTimeIsoStr(null); /*
+                                                             * clear this out
+                                                             * until it's ready
+                                                             */
+        persistWorkflowInstance();
+        executeTaskLocally(taskInstance, workflowInst.getSharedContext(),
+            task.getTaskConfig(), task.getTaskName());
+        String currentTaskIsoEndDateTimeStr = DateConvert.isoFormat(new 
Date());
+        workflowInst
+            .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
+        persistWorkflowInstance();
+      }
+
+      LOG.log(
+          Level.FINEST,
+          "IterativeWorkflowProcessorThread: Completed task: "
+              + task.getTaskName());
+
+    }
+
+    LOG.log(Level.FINEST,
+        "IterativeWorkflowProcessorThread: Completed workflow: "
+            + workflowInst.getWorkflow().getName());
+    if (!isStopped()) {
+      stop();
+    }
+
+  }
+
+  public WorkflowInstance getWorkflowInstance() {
+    return workflowInst;
+  }
+
+  public synchronized void stop() {
+    running = false;
+    // if the resource manager is active
+    // then kill the current job there
+    if (this.rClient != null && this.currentJobId != null) {
+      if (!this.rClient.killJob(this.currentJobId)) {
+        LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
+            + this.currentJobId + "]: failed");
+      }
+    }
+
+    workflowInst.setStatus(FINISHED);
+    String isoEndDateTimeStr = DateConvert.isoFormat(new Date());
+    workflowInst.setEndDateTimeIsoStr(isoEndDateTimeStr);
+    persistWorkflowInstance();
+  }
+
+  public synchronized void resume() {
+    pause = false;
+    workflowInst.setStatus(STARTED);
+    persistWorkflowInstance();
+  }
+
+  public synchronized void pause() {
+    pause = true;
+    workflowInst.setStatus(PAUSED);
+    persistWorkflowInstance();
+  }
+
+  /**
+   * @return True if the WorkflowInstance managed by this processor is paused.
+   */
+  public boolean isPaused() {
+    return pause == true;
+  }
+
+  public boolean isStopped() {
+    return !running;
+  }
+
+  /**
+   * @return Returns the fCurrentTaskId.
+   */
+  public String getCurrentTaskId() {
+    return workflowInst.getCurrentTaskId();
+  }
+
+  /**
+   * @param workflowInst
+   *          The fWorkflowInst to set.
+   */
+  public void setWorkflowInst(WorkflowInstance workflowInst) {
+    workflowInst = workflowInst;
+  }
+
+  /**
+   * @return Returns the waitForConditionSatisfy.
+   */
+  public long getWaitforConditionSatisfy() {
+    return waitForConditionSatisfy;
+  }
+
+  /**
+   * @param waitforConditionSatisfy
+   *          The waitForConditionSatisfy to set.
+   */
+  public void setWaitforConditionSatisfy(long waitforConditionSatisfy) {
+    waitForConditionSatisfy = waitforConditionSatisfy;
+  }
+
+  /**
+   * @return the instRep
+   */
+  public WorkflowInstanceRepository getInstanceRepository() {
+    return instanceRepository;
+  }
+
+  /**
+   * @param instRep
+   *          the instRep to set
+   */
+  public void setInstanceRepository(WorkflowInstanceRepository instRep) {
+    this.instanceRepository = instRep;
+  }
+
+  /**
+   * @return the rClient
+   */
+  public XmlRpcResourceManagerClient getRClient() {
+    return rClient;
+  }
+
+  /**
+   * @param client
+   *          the rClient to set
+   */
+  public void setRClient(XmlRpcResourceManagerClient client) {
+    rClient = client;
+    if (rClient != null) {
+      LOG.log(Level.INFO, "Resource Manager Job Submission enabled to: ["
+          + rClient.getResMgrUrl() + "]");
+    }
+  }
+
+  /**
+   * @return the wmgrParentUrl
+   */
+  public URL getWmgrParentUrl() {
+    return wmgrParentUrl;
+  }
+
+  /**
+   * @param wmgrParentUrl
+   *          the wmgrParentUrl to set
+   */
+  public void setWmgrParentUrl(URL wmgrParentUrl) {
+    this.wmgrParentUrl = wmgrParentUrl;
+  }
+
+  private boolean checkTaskRequiredMetadata(WorkflowTask task,
+      Metadata dynMetadata) {
+    if (task.getRequiredMetFields() == null
+        || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
+            .size() == 0)) {
+      LOG.log(Level.INFO, "Task: [" + task.getTaskName()
+          + "] has no required metadata fields");
+      return true; /* no required metadata, so we're fine */
+    }
+
+    for (Iterator i = task.getRequiredMetFields().iterator(); i.hasNext();) {
+      String reqField = (String) i.next();
+      if (!dynMetadata.containsKey(reqField)) {
+        LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
+            + "] for task: [" + task.getTaskName()
+            + "]: failed: aborting workflow");
+        return false;
+      }
+    }
+
+    LOG.log(Level.INFO, "All required metadata fields present for task: ["
+        + task.getTaskName() + "]");
+
+    return true;
+  }
+
+  private String getTaskNameById(String taskId) {
+    for (Iterator i = workflowInst.getWorkflow().getTasks().iterator(); i
+        .hasNext();) {
+      WorkflowTask task = (WorkflowTask) i.next();
+      if (task.getTaskId().equals(taskId)) {
+        return task.getTaskName();
+      }
+    }
+
+    return null;
+  }
+
+  private boolean satisfied(List conditionList, String taskId) {
+    for (Iterator i = conditionList.iterator(); i.hasNext();) {
+      WorkflowCondition c = (WorkflowCondition) i.next();
+      WorkflowConditionInstance cInst = null;
+
+      // see if we've already cached this condition instance
+      if (CONDITION_CACHE.get(taskId) != null) {
+        HashMap conditionMap = (HashMap) CONDITION_CACHE.get(taskId);
+
+        /*
+         * okay we have some conditions cached for this task, see if we have 
the
+         * one we need
+         */
+        if (conditionMap.get(c.getConditionId()) != null) {
+          cInst = (WorkflowConditionInstance) conditionMap.get(c
+              .getConditionId());
+        }
+        /* if not, then go ahead and create it and cache it */
+        else {
+          cInst = GenericWorkflowObjectFactory
+              .getConditionObjectFromClassName(c
+                  .getConditionInstanceClassName());
+          conditionMap.put(c.getConditionId(), cInst);
+        }
+      }
+      /* no conditions cached yet, so set everything up */
+      else {
+        HashMap conditionMap = new HashMap();
+        cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
+            .getConditionInstanceClassName());
+        conditionMap.put(c.getConditionId(), cInst);
+        CONDITION_CACHE.put(taskId, conditionMap);
+      }
+
+      // actually perform the evaluation
+      if (!cInst.evaluate(workflowInst.getSharedContext(), c.getTaskConfig())) 
{
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private String getHostname() {
+    try {
+      // Get hostname by textual representation of IP address
+      InetAddress addr = InetAddress.getLocalHost();
+      // Get the host name
+      String hostname = addr.getHostName();
+      return hostname;
+    } catch (UnknownHostException e) {
+    }
+    return null;
+  }
+
+  private void persistWorkflowInstance() {
+    try {
+      instanceRepository.updateWorkflowInstance(workflowInst);
+    } catch (InstanceRepositoryException e) {
+      LOG.log(Level.WARNING, "Exception persisting workflow instance: ["
+          + workflowInst.getId() + "]: Message: " + e.getMessage());
+    }
+  }
+
+  private void executeTaskLocally(WorkflowTaskInstance instance, Metadata met,
+      WorkflowTaskConfiguration cfg, String taskName) {
+    try {
+      LOG.log(Level.INFO, "Executing task: [" + taskName + "] locally");
+      instance.run(met, cfg);
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.log(Level.WARNING, "Exception executing task: [" + taskName
+          + "] locally: Message: " + e.getMessage());
+    }
+  }
+
+  private boolean safeCheckJobComplete(String jobId) {
+    try {
+      return rClient.isJobComplete(jobId);
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Exception checking completion status for job: ["
+          + jobId + "]: Messsage: " + e.getMessage());
+      return false;
+    }
+  }
+
+}

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
 Sun Sep 16 21:40:52 2012
@@ -86,7 +86,6 @@ public class PrioritizedQueueBasedWorkfl
 
   }
 
-  @Override
   public void setEngineRunner(EngineRunner runner) {
     this.runner = runner;
   }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
 Sun Sep 16 21:40:52 2012
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,16 +19,15 @@ package org.apache.oodt.cas.workflow.eng
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.engine.processor.ConditionProcessor;
-import org.apache.oodt.cas.workflow.engine.processor.SequentialProcessor;
-import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
-import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
 import org.apache.oodt.cas.workflow.structs.Workflow;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
 import 
org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.engine.IterativeWorkflowProcessorThread;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.commons.util.DateConvert;
 
 //JDK imports
@@ -46,14 +45,15 @@ import EDU.oswego.cs.dl.util.concurrent.
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 
 /**
- *
+ * 
  * The ThreadPooling portion of the WorkflowEngine. This class is meant to be 
an
  * extension point for WorkflowEngines that want to implement ThreadPooling.
  * This WorkflowEngine provides everything needed to manage a ThreadPool using
  * Doug Lea's wonderful java.util.concurrent package that made it into JDK5.
- *
+ * 
  * @author mattmann
- *
+ * @version $Revsion$
+ * 
  */
 public class ThreadPoolWorkflowEngine implements WorkflowEngine, 
WorkflowStatus {
 
@@ -70,21 +70,15 @@ public class ThreadPoolWorkflowEngine im
   /* our instance repository */
   private WorkflowInstanceRepository instRep = null;
 
-  /* the URL pointer to the parent Workflow Manager */
-  private final URL wmgrUrl = null;
-
-  /* how long to wait before checking whether a condition is satisfied. */
-  private final long conditionWait;
-
-  private final ConditionProcessor condProcessor;
+  /* our resource manager client */
+  private XmlRpcResourceManagerClient rClient = null;
 
-  private EngineRunner runner;
-
-  private WorkflowLifecycleManager lifecycleManager;
+  /* the URL pointer to the parent Workflow Manager */
+  private URL wmgrUrl = null;
 
   /**
    * Default Constructor.
-   *
+   * 
    * @param instRep
    *          The WorkflowInstanceRepository to be used by this engine.
    * @param queueSize
@@ -100,13 +94,17 @@ public class ThreadPoolWorkflowEngine im
    * @param unlimitedQueue
    *          Whether or not to use a queue whose bounds are dictated by the
    *          physical memory of the underlying hardware.
+   * @param resUrl
+   *          A URL pointer to a resource manager. If this is set Tasks will be
+   *          wrapped as Resource Manager {@link Job}s and sent through the
+   *          Resource Manager. If this parameter is not set, local execution
+   *          (the default) will be used
    */
   public ThreadPoolWorkflowEngine(WorkflowInstanceRepository instRep,
       int queueSize, int maxPoolSize, int minPoolSize,
-      long threadKeepAliveTime, boolean unlimitedQueue, 
WorkflowLifecycleManager lifecycleManager) {
+      long threadKeepAliveTime, boolean unlimitedQueue, URL resUrl) {
 
     this.instRep = instRep;
-
     Channel c = null;
     if (unlimitedQueue) {
       c = new LinkedQueue();
@@ -117,32 +115,79 @@ public class ThreadPoolWorkflowEngine im
     pool = new PooledExecutor(c, maxPoolSize);
     pool.setMinimumPoolSize(minPoolSize);
     pool.setKeepAliveTime(1000 * 60 * threadKeepAliveTime);
-    this.lifecycleManager = lifecycleManager;
 
     workerMap = new HashMap();
 
-    this.conditionWait = Long.getLong(
-        "org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
-        .longValue();
+    if (resUrl != null)
+      rClient = new XmlRpcResourceManagerClient(resUrl);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+   * (java.lang.String)
+   */
+  public synchronized void pauseWorkflowInstance(String workflowInstId) {
+    // okay, try and look up that worker thread in our hash map
+    IterativeWorkflowProcessorThread worker = 
(IterativeWorkflowProcessorThread) workerMap
+        .get(workflowInstId);
+    if (worker == null) {
+      LOG.log(Level.WARNING,
+          "WorkflowEngine: Attempt to pause workflow instance id: "
+              + workflowInstId
+              + ", however, this engine is not tracking its execution");
+      return;
+    }
+
+    // otherwise, all good
+    worker.pause();
 
-    this.condProcessor = new ConditionProcessor(lifecycleManager, new 
WorkflowInstance());
   }
 
-  @Override
-  public void setEngineRunner(EngineRunner runner) {
-     this.runner = runner;
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+   * (java.lang.String)
+   */
+  public synchronized void resumeWorkflowInstance(String workflowInstId) {
+    // okay, try and look up that worker thread in our hash map
+    IterativeWorkflowProcessorThread worker = 
(IterativeWorkflowProcessorThread) workerMap
+        .get(workflowInstId);
+    if (worker == null) {
+      LOG.log(Level.WARNING,
+          "WorkflowEngine: Attempt to resume workflow instance id: "
+              + workflowInstId + ", however, this engine is "
+              + "not tracking its execution");
+      return;
+    }
+
+    // also check to make sure that the worker is currently paused
+    // only can resume WorkflowInstances that are paused, right?
+    if (!worker.isPaused()) {
+      LOG.log(Level.WARNING,
+          "WorkflowEngine: Attempt to resume a workflow that "
+              + "isn't paused currently: instance id: " + workflowInstId);
+      return;
+    }
+
+    // okay, all good
+    worker.resume();
+
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
    * 
org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
    * .oodt.cas.workflow.structs.Workflow, 
org.apache.oodt.cas.metadata.Metadata)
    */
-  @Override
-  public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
-      throws EngineException {
+  public synchronized WorkflowInstance startWorkflow(Workflow workflow,
+      Metadata metadata) throws EngineException {
     // to start the workflow, we create a default workflow instance
     // populate it
     // persist it
@@ -151,139 +196,161 @@ public class ThreadPoolWorkflowEngine im
 
     WorkflowInstance wInst = new WorkflowInstance();
     wInst.setWorkflow(workflow);
-    wInst.setCurrentTaskId((workflow.getTasks().get(0))
+    wInst.setCurrentTaskId(((WorkflowTask) workflow.getTasks().get(0))
         .getTaskId());
     wInst.setSharedContext(metadata);
     wInst.setStatus(CREATED);
     persistWorkflowInstance(wInst);
 
-    SequentialProcessor worker = new 
SequentialProcessor(this.lifecycleManager, wInst);
-    worker.setWorkflowInstance(wInst);
+    IterativeWorkflowProcessorThread worker = new 
IterativeWorkflowProcessorThread(
+        wInst, instRep, this.wmgrUrl);
+    worker.setRClient(rClient);
     workerMap.put(wInst.getId(), worker);
 
     wInst.setStatus(QUEUED);
     persistWorkflowInstance(wInst);
 
-    /*
-     * try { pool.execute(new ThreadedExecutor(worker, this.condProcessor)); }
-     * catch (InterruptedException e) { throw new EngineException(e); }
-     */
+    try {
+      pool.execute(worker);
+    } catch (InterruptedException e) {
+      throw new EngineException(e);
+    }
 
     return wInst;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
-   * .String)
-   */
-  @Override
-  public void stopWorkflow(String workflowInstId) {
-    // TODO Auto-generated method stub
-
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
-   * (java.lang.String)
-   */
-  @Override
-  public void pauseWorkflowInstance(String workflowInstId) {
-    // TODO Auto-generated method stub
 
   }
 
   /*
    * (non-Javadoc)
-   *
-   * @see
-   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
-   * (java.lang.String)
-   */
-  @Override
-  public void resumeWorkflowInstance(String workflowInstId) {
-    // TODO Auto-generated method stub
-
-  }
-
-  /*
-   * (non-Javadoc)
-   *
+   * 
    * @see
    * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
    */
-  @Override
   public WorkflowInstanceRepository getInstanceRepository() {
     return this.instRep;
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
    * org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
    * lang.String, org.apache.oodt.cas.metadata.Metadata)
    */
-  @Override
-  public boolean updateMetadata(String workflowInstId, Metadata met) {
-    // TODO Auto-generated method stub
-    return false;
+  public synchronized boolean updateMetadata(String workflowInstId, Metadata 
met) {
+    // okay, try and look up that worker thread in our hash map
+    IterativeWorkflowProcessorThread worker = 
(IterativeWorkflowProcessorThread) workerMap
+        .get(workflowInstId);
+    if (worker == null) {
+      LOG.log(Level.WARNING,
+          "WorkflowEngine: Attempt to update metadata context "
+              + "for workflow instance id: " + workflowInstId
+              + ", however, this engine is " + "not tracking its execution");
+      return false;
+    }
+
+    worker.getWorkflowInstance().setSharedContext(met);
+    try {
+      persistWorkflowInstance(worker.getWorkflowInstance());
+    } catch (Exception e) {
+      LOG.log(
+          Level.WARNING,
+          "Exception persisting workflow instance: ["
+              + worker.getWorkflowInstance().getId() + "]: Message: "
+              + e.getMessage());
+      return false;
+    }
+
+    return true;
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
    * org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
    * (java.net.URL)
    */
-  @Override
   public void setWorkflowManagerUrl(URL url) {
-    // TODO Auto-generated method stub
-
+    this.wmgrUrl = url;
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
-   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
-   * java.lang.String)
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+   * .String)
    */
-  @Override
-  public double getWallClockMinutes(String workflowInstId) {
-    // TODO Auto-generated method stub
-    return 0;
+  public synchronized void stopWorkflow(String workflowInstId) {
+    // okay, try and look up that worker thread in our hash map
+    IterativeWorkflowProcessorThread worker = 
(IterativeWorkflowProcessorThread) workerMap
+        .get(workflowInstId);
+    if (worker == null) {
+      LOG.log(Level.WARNING,
+          "WorkflowEngine: Attempt to stop workflow instance id: "
+              + workflowInstId + ", however, this engine is "
+              + "not tracking its execution");
+      return;
+    }
+
+    worker.stop();
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
    * getCurrentTaskWallClockMinutes(java.lang.String)
    */
-  @Override
   public double getCurrentTaskWallClockMinutes(String workflowInstId) {
-    // TODO Auto-generated method stub
-    return 0;
+    // get the workflow instance that we're talking about
+    WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
+    return getCurrentTaskWallClockMinutes(inst);
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see
    * 
org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
    * (java.lang.String)
    */
-  @Override
   public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
-    // TODO Auto-generated method stub
-    return null;
+    // okay, try and look up that worker thread in our hash map
+    IterativeWorkflowProcessorThread worker = 
(IterativeWorkflowProcessorThread) workerMap
+        .get(workflowInstId);
+    if (worker == null) {
+      // try and get the metadata
+      // from the workflow instance repository (as it was persisted)
+      try {
+        WorkflowInstance inst = 
instRep.getWorkflowInstanceById(workflowInstId);
+        return inst.getSharedContext();
+      } catch (InstanceRepositoryException e) {
+        LOG.log(Level.FINEST, "WorkflowEngine: Attempt to get metadata "
+            + "for workflow instance id: " + workflowInstId
+            + ", however, this engine is "
+            + "not tracking its execution and the id: [" + workflowInstId
+            + "] " + "was never persisted to " + "the instance repository");
+        e.printStackTrace();
+        return new Metadata();
+      }
+    }
+
+    return worker.getWorkflowInstance().getSharedContext();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+   * java.lang.String)
+   */
+  public double getWallClockMinutes(String workflowInstId) {
+    // get the workflow instance that we're talking about
+    WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
+    return getWallClockMinutes(inst);
   }
 
   protected static double getWallClockMinutes(WorkflowInstance inst) {
@@ -366,15 +433,6 @@ public class ThreadPoolWorkflowEngine im
     return diffMins;
   }
 
-  private static Date safeDateConvert(String isoTimeStr) {
-    try {
-      return DateConvert.isoParse(isoTimeStr);
-    } catch (Exception ignore) {
-      ignore.printStackTrace();
-      return null;
-    }
-  }
-
   private synchronized void persistWorkflowInstance(WorkflowInstance wInst)
       throws EngineException {
 
@@ -396,7 +454,21 @@ public class ThreadPoolWorkflowEngine im
 
   }
 
-  // FIXME: add back in the ThreadPoolWorkflowEngine implementation,
-  // after the sub-modules are fixed.
+  private WorkflowInstance safeGetWorkflowInstanceById(String workflowInstId) {
+    try {
+      return instRep.getWorkflowInstanceById(workflowInstId);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private static Date safeDateConvert(String isoTimeStr) {
+    try {
+      return DateConvert.isoParse(isoTimeStr);
+    } catch (Exception ignore) {
+      ignore.printStackTrace();
+      return null;
+    }
+  }
 
-}
+}
\ No newline at end of file

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
 Sun Sep 16 21:40:52 2012
@@ -16,15 +16,14 @@
  */
 package org.apache.oodt.cas.workflow.engine;
 
-//JDK static imports
+//JDK imports
 import static java.lang.Boolean.getBoolean;
 import static java.lang.Integer.getInteger;
 import static java.lang.Long.getLong;
+import java.net.URL;
 
 //OODT imports
-import org.apache.oodt.cas.metadata.util.PathUtils;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
 import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
 
 /**
@@ -41,7 +40,7 @@ public class ThreadPoolWorkflowEngineFac
   private static final String MIN_POOL_SIZE_PROPERTY = 
"org.apache.oodt.cas.workflow.engine.minPoolSize";
   private static final String THREAD_KEEP_ALIVE_PROPERTY = 
"org.apache.oodt.cas.workflow.engine.threadKeepAlive.minutes";
   private static final String UNLIMITED_QUEUE_PROPERTY = 
"org.apache.oodt.cas.workflow.engine.unlimitedQueue";
-  private static final String LIFECYCLES_FILE_PATH_PROPERTY = 
"org.apache.oodt.cas.workflow.lifecycle.filePath";
+  private static final String RESMGR_URL_PROPERTY = 
"org.apache.oodt.cas.workflow.engine.resourcemgr.url";
 
   private static final int DEFAULT_QUEUE_SIZE = 10;
   private static final int DEFAULT_MAX_POOL_SIZE = 10;
@@ -52,7 +51,7 @@ public class ThreadPoolWorkflowEngineFac
   public WorkflowEngine createWorkflowEngine() {
     return new ThreadPoolWorkflowEngine(getWorkflowInstanceRepository(),
         getQueueSize(), getMaxPoolSize(), getMinPoolSize(),
-        getThreadKeepAliveMinutes(), isUnlimitedQueue(), 
getWorkflowLifecycle());
+        getThreadKeepAliveMinutes(), isUnlimitedQueue(), getResmgrUrl());
   }
 
   protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
@@ -61,10 +60,9 @@ public class ThreadPoolWorkflowEngineFac
             .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
   }
 
-  protected WorkflowLifecycleManager getWorkflowLifecycle() {
+  protected URL getResmgrUrl() {
     try {
-      return new WorkflowLifecycleManager(PathUtils.replaceEnvVariables(System
-          .getProperty(LIFECYCLES_FILE_PATH_PROPERTY)));
+      return new URL(System.getProperty(RESMGR_URL_PROPERTY));
     } catch (Exception e) {
       e.printStackTrace();
       return null;

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
 Sun Sep 16 21:40:52 2012
@@ -20,7 +20,6 @@ package org.apache.oodt.cas.workflow.eng
 import java.net.URL;
 
 //OODT imports
-import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
 import org.apache.oodt.cas.workflow.structs.Workflow;
@@ -38,10 +37,6 @@ public interface WorkflowEngine {
 
     public static final String X_POINT_ID = WorkflowEngine.class.getName();
 
-    /**
-     * @param runner The Runner to use when executing TaskInstances.
-     */
-    public void setEngineRunner(EngineRunner runner);
 
     /**
      * <p>


Reply via email to