Author: mattmann
Date: Sun Sep 16 21:40:52 2012
New Revision: 1385381
URL: http://svn.apache.org/viewvc?rev=1385381&view=rev
Log:
- fix for OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus
patches
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
Modified: oodt/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Sun Sep 16 21:40:52 2012
@@ -4,6 +4,9 @@ Apache OODT Change Log
Release 0.5
--------------------------------------------
+* OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus
+ patches (mattmann)
+
* OODT-500: Rename property for max threads in AsyncLocalEngineRunner
(mattmann)
* OODT-497: Make WorkflowProcessor PrioritySorters thread-safe (mattmann)
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java?rev=1385381&view=auto
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
(added)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
Sun Sep 16 21:40:52 2012
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
+import
org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.commons.util.DateConvert;
+
+//JDK imports
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An instance of the {@link WorkflowProcessorThread} that processes through an
+ * iterative {@link WorkflowInstance}. This class keeps an
<code>Iterator</code>
+ * that allows it to move from one end of a sequential {@link Workflow}
+ * processing pipeline to another. This class should only be used to process
+ * science pipeline style {@link Workflow}s, i.e., those which resemble an
+ * iterative processing pipelines, with no forks, or concurrent task
executions.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+
+public class IterativeWorkflowProcessorThread implements WorkflowStatus,
+ CoreMetKeys, Runnable {
+
+ /* the default queue name if we're using resmgr job submission */
+ private static final String DEFAULT_QUEUE_NAME = "high";
+
+ /* an iterator representing the current task that we are on in the workflow
*/
+ private Iterator taskIterator = null;
+
+ /* the workflow instance that this processor thread is processing */
+ private WorkflowInstance workflowInst = null;
+
+ /* should our workflow processor thread start running? */
+ private boolean running = false;
+
+ /*
+ * the amount of seconds to wait inbetween checking for task pre-condition
+ * satisfaction
+ */
+ private long waitForConditionSatisfy = -1;
+
+ /* our instance repository used to persist workflow instance info */
+ private WorkflowInstanceRepository instanceRepository = null;
+
+ /*
+ * our client to a resource manager: if null, local task execution will be
+ * performed
+ */
+ private XmlRpcResourceManagerClient rClient = null;
+
+ /* polling wait for res mgr */
+ private long pollingWaitTime = 10L;
+
+ /*
+ * should our workflow processor thread pause, and not move onto the next
+ * task?
+ */
+ private boolean pause = false;
+
+ /* our log stream */
+ private static Logger LOG = Logger
+ .getLogger(IterativeWorkflowProcessorThread.class.getName());
+
+ private Map CONDITION_CACHE = new HashMap();
+
+ /* the parent workflow manager url that executed this processor thread */
+ private URL wmgrParentUrl = null;
+
+ /* the currently executing jobId if we're using the resource manager */
+ private String currentJobId = null;
+
+ public IterativeWorkflowProcessorThread(WorkflowInstance wInst,
+ WorkflowInstanceRepository instRep, URL wParentUrl) {
+ workflowInst = wInst;
+ taskIterator = workflowInst.getWorkflow().getTasks().iterator();
+ this.instanceRepository = instRep;
+
+ /* start out the gates running */
+ running = true;
+
+ /*
+ * get the amount of wait time inbetween checking for task pre-condition
+ * satisfaction
+ */
+ waitForConditionSatisfy = Long.getLong(
+ "org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
+ .longValue();
+
+ pollingWaitTime = Long.getLong(
+ "org.apache.oodt.cas.workflow.engine.resourcemgr.pollingWaitTime", 10)
+ .longValue();
+
+ wmgrParentUrl = wParentUrl;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ /*
+ * okay, we got into the run method, mark the start date time for the
+ * workflow instance here
+ */
+ String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
+ workflowInst.setStartDateTimeIsoStr(startDateTimeIsoStr);
+ // persist it
+ persistWorkflowInstance();
+
+ while (running && taskIterator.hasNext()) {
+ if (pause) {
+ LOG.log(Level.FINE,
+ "IterativeWorkflowProcessorThread: Skipping execution: Paused:
CurrentTask: "
+ + getTaskNameById(workflowInst.getCurrentTaskId()));
+ continue;
+ }
+
+ WorkflowTask task = (WorkflowTask) taskIterator.next();
+ workflowInst.setCurrentTaskId(task.getTaskId());
+
+ // now persist it
+ persistWorkflowInstance();
+
+ // check to see if req met fields are present
+ // if they aren't, set the status to METERROR, and then fail
+ if (!checkTaskRequiredMetadata(task,
this.workflowInst.getSharedContext())) {
+ this.workflowInst.setStatus(METADATA_MISSING);
+ persistWorkflowInstance();
+ // now break out of this run loop
+ return;
+ }
+
+ // this is where the pre-conditions come in
+ // only execute the below code when it's passed all of its
+ // pre-conditions
+ if (task.getConditions() != null) {
+ while (!satisfied(task.getConditions(), task.getTaskId())
+ && !isStopped()) {
+
+ // if we're not paused, go ahead and pause us now
+ if (!isPaused()) {
+ pause();
+ }
+
+ LOG.log(Level.FINEST,
+ "Pre-conditions for task: " + task.getTaskName()
+ + " unsatisfied: waiting: " + waitForConditionSatisfy
+ + " seconds before checking again.");
+ try {
+ Thread.currentThread().sleep(waitForConditionSatisfy * 1000);
+ } catch (InterruptedException ignore) {
+ }
+
+ // check to see if we've been resumed, if so, break
+ // the loop and start
+ if (!isPaused()) {
+ break;
+ }
+ }
+
+ // check to see if we've been killed
+ if (isStopped()) {
+ break;
+ }
+
+ // un pause us (if needed)
+ if (isPaused()) {
+ resume();
+ }
+ }
+
+ // task execution
+ LOG.log(
+ Level.FINEST,
+ "IterativeWorkflowProcessorThread: Executing task: "
+ + task.getTaskName());
+
+ WorkflowTaskInstance taskInstance = GenericWorkflowObjectFactory
+ .getTaskObjectFromClassName(task.getTaskInstanceClassName());
+ // add the TaskId and the JobId and ProcessingNode
+ // TODO: unfake the JobId
+ workflowInst.getSharedContext()
+ .replaceMetadata(TASK_ID, task.getTaskId());
+ workflowInst.getSharedContext().replaceMetadata(WORKFLOW_INST_ID,
+ workflowInst.getId());
+ workflowInst.getSharedContext().replaceMetadata(JOB_ID,
+ workflowInst.getId());
+ workflowInst.getSharedContext().replaceMetadata(PROCESSING_NODE,
+ getHostname());
+ workflowInst.getSharedContext().replaceMetadata(WORKFLOW_MANAGER_URL,
+ this.wmgrParentUrl.toString());
+
+ if (rClient != null) {
+ // build up the Job
+ // and the Job Input
+ Job taskJob = new Job();
+ taskJob.setName(task.getTaskId());
+ taskJob
+
.setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
+ taskJob
+
.setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
+ taskJob.setLoadValue(new Integer(2));
+ taskJob
+ .setQueueName(task.getTaskConfig().getProperty(QUEUE_NAME) != null
? task
+ .getTaskConfig().getProperty(QUEUE_NAME) : DEFAULT_QUEUE_NAME);
+
+ TaskJobInput in = new TaskJobInput();
+ in.setDynMetadata(workflowInst.getSharedContext());
+ in.setTaskConfig(task.getTaskConfig());
+ in.setWorkflowTaskInstanceClassName(task.getTaskInstanceClassName());
+
+ workflowInst.setStatus(RESMGR_SUBMIT);
+ persistWorkflowInstance();
+
+ try {
+ // this is * NOT * a blocking operation so when it returns
+ // the job may not actually have finished executing
+ // so we go into a waiting/sleep behavior using the passed
+ // back job id to wait until the job has actually finished
+ // executing
+
+ this.currentJobId = rClient.submitJob(taskJob, in);
+
+ while (!safeCheckJobComplete(this.currentJobId) && !isStopped()) {
+ // sleep for 5 seconds then come back
+ // and check again
+ try {
+ Thread.currentThread().sleep(pollingWaitTime * 1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ // okay job is done: TODO: fix this hack
+ // the task update time was set remotely
+ // by remote task, so let's read it now
+ // from the instRepo (which will have the updated
+ // time)
+
+ if (isStopped()) {
+ // this means that this workflow was killed, so
+ // gracefully exit
+ break;
+ }
+
+ WorkflowInstance updatedInst = null;
+ try {
+ updatedInst = instanceRepository
+ .getWorkflowInstanceById(workflowInst.getId());
+ workflowInst = updatedInst;
+ } catch (InstanceRepositoryException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unable to get " + "updated workflow "
+ + "instance record " + "when executing remote job: Message: "
+ + e.getMessage());
+ }
+
+ } catch (JobExecutionException e) {
+ LOG.log(Level.WARNING,
+ "Job execution exception using resource manager to execute job:
Message: "
+ + e.getMessage());
+ }
+ } else {
+ // we started, so mark it
+ workflowInst.setStatus(STARTED);
+ // go ahead and persist the workflow instance, after we
+ // save the current task start date time
+ String currentTaskIsoStartDateTimeStr = DateConvert
+ .isoFormat(new Date());
+ workflowInst
+ .setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
+ workflowInst.setCurrentTaskEndDateTimeIsoStr(null); /*
+ * clear this out
+ * until it's ready
+ */
+ persistWorkflowInstance();
+ executeTaskLocally(taskInstance, workflowInst.getSharedContext(),
+ task.getTaskConfig(), task.getTaskName());
+ String currentTaskIsoEndDateTimeStr = DateConvert.isoFormat(new
Date());
+ workflowInst
+ .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
+ persistWorkflowInstance();
+ }
+
+ LOG.log(
+ Level.FINEST,
+ "IterativeWorkflowProcessorThread: Completed task: "
+ + task.getTaskName());
+
+ }
+
+ LOG.log(Level.FINEST,
+ "IterativeWorkflowProcessorThread: Completed workflow: "
+ + workflowInst.getWorkflow().getName());
+ if (!isStopped()) {
+ stop();
+ }
+
+ }
+
+ public WorkflowInstance getWorkflowInstance() {
+ return workflowInst;
+ }
+
+ public synchronized void stop() {
+ running = false;
+ // if the resource manager is active
+ // then kill the current job there
+ if (this.rClient != null && this.currentJobId != null) {
+ if (!this.rClient.killJob(this.currentJobId)) {
+ LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
+ + this.currentJobId + "]: failed");
+ }
+ }
+
+ workflowInst.setStatus(FINISHED);
+ String isoEndDateTimeStr = DateConvert.isoFormat(new Date());
+ workflowInst.setEndDateTimeIsoStr(isoEndDateTimeStr);
+ persistWorkflowInstance();
+ }
+
+ public synchronized void resume() {
+ pause = false;
+ workflowInst.setStatus(STARTED);
+ persistWorkflowInstance();
+ }
+
+ public synchronized void pause() {
+ pause = true;
+ workflowInst.setStatus(PAUSED);
+ persistWorkflowInstance();
+ }
+
+ /**
+ * @return True if the WorkflowInstance managed by this processor is paused.
+ */
+ public boolean isPaused() {
+ return pause == true;
+ }
+
+ public boolean isStopped() {
+ return !running;
+ }
+
+ /**
+ * @return Returns the fCurrentTaskId.
+ */
+ public String getCurrentTaskId() {
+ return workflowInst.getCurrentTaskId();
+ }
+
+ /**
+ * @param workflowInst
+ * The fWorkflowInst to set.
+ */
+ public void setWorkflowInst(WorkflowInstance workflowInst) {
+ workflowInst = workflowInst;
+ }
+
+ /**
+ * @return Returns the waitForConditionSatisfy.
+ */
+ public long getWaitforConditionSatisfy() {
+ return waitForConditionSatisfy;
+ }
+
+ /**
+ * @param waitforConditionSatisfy
+ * The waitForConditionSatisfy to set.
+ */
+ public void setWaitforConditionSatisfy(long waitforConditionSatisfy) {
+ waitForConditionSatisfy = waitforConditionSatisfy;
+ }
+
+ /**
+ * @return the instRep
+ */
+ public WorkflowInstanceRepository getInstanceRepository() {
+ return instanceRepository;
+ }
+
+ /**
+ * @param instRep
+ * the instRep to set
+ */
+ public void setInstanceRepository(WorkflowInstanceRepository instRep) {
+ this.instanceRepository = instRep;
+ }
+
+ /**
+ * @return the rClient
+ */
+ public XmlRpcResourceManagerClient getRClient() {
+ return rClient;
+ }
+
+ /**
+ * @param client
+ * the rClient to set
+ */
+ public void setRClient(XmlRpcResourceManagerClient client) {
+ rClient = client;
+ if (rClient != null) {
+ LOG.log(Level.INFO, "Resource Manager Job Submission enabled to: ["
+ + rClient.getResMgrUrl() + "]");
+ }
+ }
+
+ /**
+ * @return the wmgrParentUrl
+ */
+ public URL getWmgrParentUrl() {
+ return wmgrParentUrl;
+ }
+
+ /**
+ * @param wmgrParentUrl
+ * the wmgrParentUrl to set
+ */
+ public void setWmgrParentUrl(URL wmgrParentUrl) {
+ this.wmgrParentUrl = wmgrParentUrl;
+ }
+
+ private boolean checkTaskRequiredMetadata(WorkflowTask task,
+ Metadata dynMetadata) {
+ if (task.getRequiredMetFields() == null
+ || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
+ .size() == 0)) {
+ LOG.log(Level.INFO, "Task: [" + task.getTaskName()
+ + "] has no required metadata fields");
+ return true; /* no required metadata, so we're fine */
+ }
+
+ for (Iterator i = task.getRequiredMetFields().iterator(); i.hasNext();) {
+ String reqField = (String) i.next();
+ if (!dynMetadata.containsKey(reqField)) {
+ LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
+ + "] for task: [" + task.getTaskName()
+ + "]: failed: aborting workflow");
+ return false;
+ }
+ }
+
+ LOG.log(Level.INFO, "All required metadata fields present for task: ["
+ + task.getTaskName() + "]");
+
+ return true;
+ }
+
+ private String getTaskNameById(String taskId) {
+ for (Iterator i = workflowInst.getWorkflow().getTasks().iterator(); i
+ .hasNext();) {
+ WorkflowTask task = (WorkflowTask) i.next();
+ if (task.getTaskId().equals(taskId)) {
+ return task.getTaskName();
+ }
+ }
+
+ return null;
+ }
+
+ private boolean satisfied(List conditionList, String taskId) {
+ for (Iterator i = conditionList.iterator(); i.hasNext();) {
+ WorkflowCondition c = (WorkflowCondition) i.next();
+ WorkflowConditionInstance cInst = null;
+
+ // see if we've already cached this condition instance
+ if (CONDITION_CACHE.get(taskId) != null) {
+ HashMap conditionMap = (HashMap) CONDITION_CACHE.get(taskId);
+
+ /*
+ * okay we have some conditions cached for this task, see if we have
the
+ * one we need
+ */
+ if (conditionMap.get(c.getConditionId()) != null) {
+ cInst = (WorkflowConditionInstance) conditionMap.get(c
+ .getConditionId());
+ }
+ /* if not, then go ahead and create it and cache it */
+ else {
+ cInst = GenericWorkflowObjectFactory
+ .getConditionObjectFromClassName(c
+ .getConditionInstanceClassName());
+ conditionMap.put(c.getConditionId(), cInst);
+ }
+ }
+ /* no conditions cached yet, so set everything up */
+ else {
+ HashMap conditionMap = new HashMap();
+ cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
+ .getConditionInstanceClassName());
+ conditionMap.put(c.getConditionId(), cInst);
+ CONDITION_CACHE.put(taskId, conditionMap);
+ }
+
+ // actually perform the evaluation
+ if (!cInst.evaluate(workflowInst.getSharedContext(), c.getTaskConfig()))
{
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private String getHostname() {
+ try {
+ // Get hostname by textual representation of IP address
+ InetAddress addr = InetAddress.getLocalHost();
+ // Get the host name
+ String hostname = addr.getHostName();
+ return hostname;
+ } catch (UnknownHostException e) {
+ }
+ return null;
+ }
+
+ private void persistWorkflowInstance() {
+ try {
+ instanceRepository.updateWorkflowInstance(workflowInst);
+ } catch (InstanceRepositoryException e) {
+ LOG.log(Level.WARNING, "Exception persisting workflow instance: ["
+ + workflowInst.getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+ private void executeTaskLocally(WorkflowTaskInstance instance, Metadata met,
+ WorkflowTaskConfiguration cfg, String taskName) {
+ try {
+ LOG.log(Level.INFO, "Executing task: [" + taskName + "] locally");
+ instance.run(met, cfg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Exception executing task: [" + taskName
+ + "] locally: Message: " + e.getMessage());
+ }
+ }
+
+ private boolean safeCheckJobComplete(String jobId) {
+ try {
+ return rClient.isJobComplete(jobId);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Exception checking completion status for job: ["
+ + jobId + "]: Messsage: " + e.getMessage());
+ return false;
+ }
+ }
+
+}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
Sun Sep 16 21:40:52 2012
@@ -86,7 +86,6 @@ public class PrioritizedQueueBasedWorkfl
}
- @Override
public void setEngineRunner(EngineRunner runner) {
this.runner = runner;
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
Sun Sep 16 21:40:52 2012
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,16 +19,15 @@ package org.apache.oodt.cas.workflow.eng
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.engine.processor.ConditionProcessor;
-import org.apache.oodt.cas.workflow.engine.processor.SequentialProcessor;
-import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
-import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
import
org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.engine.IterativeWorkflowProcessorThread;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.commons.util.DateConvert;
//JDK imports
@@ -46,14 +45,15 @@ import EDU.oswego.cs.dl.util.concurrent.
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
- *
+ *
* The ThreadPooling portion of the WorkflowEngine. This class is meant to be
an
* extension point for WorkflowEngines that want to implement ThreadPooling.
* This WorkflowEngine provides everything needed to manage a ThreadPool using
* Doug Lea's wonderful java.util.concurrent package that made it into JDK5.
- *
+ *
* @author mattmann
- *
+ * @version $Revsion$
+ *
*/
public class ThreadPoolWorkflowEngine implements WorkflowEngine,
WorkflowStatus {
@@ -70,21 +70,15 @@ public class ThreadPoolWorkflowEngine im
/* our instance repository */
private WorkflowInstanceRepository instRep = null;
- /* the URL pointer to the parent Workflow Manager */
- private final URL wmgrUrl = null;
-
- /* how long to wait before checking whether a condition is satisfied. */
- private final long conditionWait;
-
- private final ConditionProcessor condProcessor;
+ /* our resource manager client */
+ private XmlRpcResourceManagerClient rClient = null;
- private EngineRunner runner;
-
- private WorkflowLifecycleManager lifecycleManager;
+ /* the URL pointer to the parent Workflow Manager */
+ private URL wmgrUrl = null;
/**
* Default Constructor.
- *
+ *
* @param instRep
* The WorkflowInstanceRepository to be used by this engine.
* @param queueSize
@@ -100,13 +94,17 @@ public class ThreadPoolWorkflowEngine im
* @param unlimitedQueue
* Whether or not to use a queue whose bounds are dictated by the
* physical memory of the underlying hardware.
+ * @param resUrl
+ * A URL pointer to a resource manager. If this is set Tasks will be
+ * wrapped as Resource Manager {@link Job}s and sent through the
+ * Resource Manager. If this parameter is not set, local execution
+ * (the default) will be used
*/
public ThreadPoolWorkflowEngine(WorkflowInstanceRepository instRep,
int queueSize, int maxPoolSize, int minPoolSize,
- long threadKeepAliveTime, boolean unlimitedQueue,
WorkflowLifecycleManager lifecycleManager) {
+ long threadKeepAliveTime, boolean unlimitedQueue, URL resUrl) {
this.instRep = instRep;
-
Channel c = null;
if (unlimitedQueue) {
c = new LinkedQueue();
@@ -117,32 +115,79 @@ public class ThreadPoolWorkflowEngine im
pool = new PooledExecutor(c, maxPoolSize);
pool.setMinimumPoolSize(minPoolSize);
pool.setKeepAliveTime(1000 * 60 * threadKeepAliveTime);
- this.lifecycleManager = lifecycleManager;
workerMap = new HashMap();
- this.conditionWait = Long.getLong(
- "org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
- .longValue();
+ if (resUrl != null)
+ rClient = new XmlRpcResourceManagerClient(resUrl);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+ * (java.lang.String)
+ */
+ public synchronized void pauseWorkflowInstance(String workflowInstId) {
+ // okay, try and look up that worker thread in our hash map
+ IterativeWorkflowProcessorThread worker =
(IterativeWorkflowProcessorThread) workerMap
+ .get(workflowInstId);
+ if (worker == null) {
+ LOG.log(Level.WARNING,
+ "WorkflowEngine: Attempt to pause workflow instance id: "
+ + workflowInstId
+ + ", however, this engine is not tracking its execution");
+ return;
+ }
+
+ // otherwise, all good
+ worker.pause();
- this.condProcessor = new ConditionProcessor(lifecycleManager, new
WorkflowInstance());
}
- @Override
- public void setEngineRunner(EngineRunner runner) {
- this.runner = runner;
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+ * (java.lang.String)
+ */
+ public synchronized void resumeWorkflowInstance(String workflowInstId) {
+ // okay, try and look up that worker thread in our hash map
+ IterativeWorkflowProcessorThread worker =
(IterativeWorkflowProcessorThread) workerMap
+ .get(workflowInstId);
+ if (worker == null) {
+ LOG.log(Level.WARNING,
+ "WorkflowEngine: Attempt to resume workflow instance id: "
+ + workflowInstId + ", however, this engine is "
+ + "not tracking its execution");
+ return;
+ }
+
+ // also check to make sure that the worker is currently paused
+ // only can resume WorkflowInstances that are paused, right?
+ if (!worker.isPaused()) {
+ LOG.log(Level.WARNING,
+ "WorkflowEngine: Attempt to resume a workflow that "
+ + "isn't paused currently: instance id: " + workflowInstId);
+ return;
+ }
+
+ // okay, all good
+ worker.resume();
+
}
/*
* (non-Javadoc)
- *
+ *
* @see
*
org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
* .oodt.cas.workflow.structs.Workflow,
org.apache.oodt.cas.metadata.Metadata)
*/
- @Override
- public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
- throws EngineException {
+ public synchronized WorkflowInstance startWorkflow(Workflow workflow,
+ Metadata metadata) throws EngineException {
// to start the workflow, we create a default workflow instance
// populate it
// persist it
@@ -151,139 +196,161 @@ public class ThreadPoolWorkflowEngine im
WorkflowInstance wInst = new WorkflowInstance();
wInst.setWorkflow(workflow);
- wInst.setCurrentTaskId((workflow.getTasks().get(0))
+ wInst.setCurrentTaskId(((WorkflowTask) workflow.getTasks().get(0))
.getTaskId());
wInst.setSharedContext(metadata);
wInst.setStatus(CREATED);
persistWorkflowInstance(wInst);
- SequentialProcessor worker = new
SequentialProcessor(this.lifecycleManager, wInst);
- worker.setWorkflowInstance(wInst);
+ IterativeWorkflowProcessorThread worker = new
IterativeWorkflowProcessorThread(
+ wInst, instRep, this.wmgrUrl);
+ worker.setRClient(rClient);
workerMap.put(wInst.getId(), worker);
wInst.setStatus(QUEUED);
persistWorkflowInstance(wInst);
- /*
- * try { pool.execute(new ThreadedExecutor(worker, this.condProcessor)); }
- * catch (InterruptedException e) { throw new EngineException(e); }
- */
+ try {
+ pool.execute(worker);
+ } catch (InterruptedException e) {
+ throw new EngineException(e);
+ }
return wInst;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
- * .String)
- */
- @Override
- public void stopWorkflow(String workflowInstId) {
- // TODO Auto-generated method stub
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
- * (java.lang.String)
- */
- @Override
- public void pauseWorkflowInstance(String workflowInstId) {
- // TODO Auto-generated method stub
}
/*
* (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
- * (java.lang.String)
- */
- @Override
- public void resumeWorkflowInstance(String workflowInstId) {
- // TODO Auto-generated method stub
-
- }
-
- /*
- * (non-Javadoc)
- *
+ *
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
*/
- @Override
public WorkflowInstanceRepository getInstanceRepository() {
return this.instRep;
}
/*
* (non-Javadoc)
- *
+ *
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
* lang.String, org.apache.oodt.cas.metadata.Metadata)
*/
- @Override
- public boolean updateMetadata(String workflowInstId, Metadata met) {
- // TODO Auto-generated method stub
- return false;
+ public synchronized boolean updateMetadata(String workflowInstId, Metadata
met) {
+ // okay, try and look up that worker thread in our hash map
+ IterativeWorkflowProcessorThread worker =
(IterativeWorkflowProcessorThread) workerMap
+ .get(workflowInstId);
+ if (worker == null) {
+ LOG.log(Level.WARNING,
+ "WorkflowEngine: Attempt to update metadata context "
+ + "for workflow instance id: " + workflowInstId
+ + ", however, this engine is " + "not tracking its execution");
+ return false;
+ }
+
+ worker.getWorkflowInstance().setSharedContext(met);
+ try {
+ persistWorkflowInstance(worker.getWorkflowInstance());
+ } catch (Exception e) {
+ LOG.log(
+ Level.WARNING,
+ "Exception persisting workflow instance: ["
+ + worker.getWorkflowInstance().getId() + "]: Message: "
+ + e.getMessage());
+ return false;
+ }
+
+ return true;
}
/*
* (non-Javadoc)
- *
+ *
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
* (java.net.URL)
*/
- @Override
public void setWorkflowManagerUrl(URL url) {
- // TODO Auto-generated method stub
-
+ this.wmgrUrl = url;
}
/*
* (non-Javadoc)
- *
+ *
* @see
- * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
- * java.lang.String)
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+ * .String)
*/
- @Override
- public double getWallClockMinutes(String workflowInstId) {
- // TODO Auto-generated method stub
- return 0;
+ public synchronized void stopWorkflow(String workflowInstId) {
+ // okay, try and look up that worker thread in our hash map
+ IterativeWorkflowProcessorThread worker =
(IterativeWorkflowProcessorThread) workerMap
+ .get(workflowInstId);
+ if (worker == null) {
+ LOG.log(Level.WARNING,
+ "WorkflowEngine: Attempt to stop workflow instance id: "
+ + workflowInstId + ", however, this engine is "
+ + "not tracking its execution");
+ return;
+ }
+
+ worker.stop();
}
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
* getCurrentTaskWallClockMinutes(java.lang.String)
*/
- @Override
public double getCurrentTaskWallClockMinutes(String workflowInstId) {
- // TODO Auto-generated method stub
- return 0;
+ // get the workflow instance that we're talking about
+ WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
+ return getCurrentTaskWallClockMinutes(inst);
}
/*
* (non-Javadoc)
- *
+ *
* @see
*
org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
* (java.lang.String)
*/
- @Override
public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
- // TODO Auto-generated method stub
- return null;
+ // okay, try and look up that worker thread in our hash map
+ IterativeWorkflowProcessorThread worker =
(IterativeWorkflowProcessorThread) workerMap
+ .get(workflowInstId);
+ if (worker == null) {
+ // try and get the metadata
+ // from the workflow instance repository (as it was persisted)
+ try {
+ WorkflowInstance inst =
instRep.getWorkflowInstanceById(workflowInstId);
+ return inst.getSharedContext();
+ } catch (InstanceRepositoryException e) {
+ LOG.log(Level.FINEST, "WorkflowEngine: Attempt to get metadata "
+ + "for workflow instance id: " + workflowInstId
+ + ", however, this engine is "
+ + "not tracking its execution and the id: [" + workflowInstId
+ + "] " + "was never persisted to " + "the instance repository");
+ e.printStackTrace();
+ return new Metadata();
+ }
+ }
+
+ return worker.getWorkflowInstance().getSharedContext();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+ * java.lang.String)
+ */
+ public double getWallClockMinutes(String workflowInstId) {
+ // get the workflow instance that we're talking about
+ WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
+ return getWallClockMinutes(inst);
}
protected static double getWallClockMinutes(WorkflowInstance inst) {
@@ -366,15 +433,6 @@ public class ThreadPoolWorkflowEngine im
return diffMins;
}
- private static Date safeDateConvert(String isoTimeStr) {
- try {
- return DateConvert.isoParse(isoTimeStr);
- } catch (Exception ignore) {
- ignore.printStackTrace();
- return null;
- }
- }
-
private synchronized void persistWorkflowInstance(WorkflowInstance wInst)
throws EngineException {
@@ -396,7 +454,21 @@ public class ThreadPoolWorkflowEngine im
}
- // FIXME: add back in the ThreadPoolWorkflowEngine implementation,
- // after the sub-modules are fixed.
+ private WorkflowInstance safeGetWorkflowInstanceById(String workflowInstId) {
+ try {
+ return instRep.getWorkflowInstanceById(workflowInstId);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static Date safeDateConvert(String isoTimeStr) {
+ try {
+ return DateConvert.isoParse(isoTimeStr);
+ } catch (Exception ignore) {
+ ignore.printStackTrace();
+ return null;
+ }
+ }
-}
+}
\ No newline at end of file
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngineFactory.java
Sun Sep 16 21:40:52 2012
@@ -16,15 +16,14 @@
*/
package org.apache.oodt.cas.workflow.engine;
-//JDK static imports
+//JDK imports
import static java.lang.Boolean.getBoolean;
import static java.lang.Integer.getInteger;
import static java.lang.Long.getLong;
+import java.net.URL;
//OODT imports
-import org.apache.oodt.cas.metadata.util.PathUtils;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
/**
@@ -41,7 +40,7 @@ public class ThreadPoolWorkflowEngineFac
private static final String MIN_POOL_SIZE_PROPERTY =
"org.apache.oodt.cas.workflow.engine.minPoolSize";
private static final String THREAD_KEEP_ALIVE_PROPERTY =
"org.apache.oodt.cas.workflow.engine.threadKeepAlive.minutes";
private static final String UNLIMITED_QUEUE_PROPERTY =
"org.apache.oodt.cas.workflow.engine.unlimitedQueue";
- private static final String LIFECYCLES_FILE_PATH_PROPERTY =
"org.apache.oodt.cas.workflow.lifecycle.filePath";
+ private static final String RESMGR_URL_PROPERTY =
"org.apache.oodt.cas.workflow.engine.resourcemgr.url";
private static final int DEFAULT_QUEUE_SIZE = 10;
private static final int DEFAULT_MAX_POOL_SIZE = 10;
@@ -52,7 +51,7 @@ public class ThreadPoolWorkflowEngineFac
public WorkflowEngine createWorkflowEngine() {
return new ThreadPoolWorkflowEngine(getWorkflowInstanceRepository(),
getQueueSize(), getMaxPoolSize(), getMinPoolSize(),
- getThreadKeepAliveMinutes(), isUnlimitedQueue(),
getWorkflowLifecycle());
+ getThreadKeepAliveMinutes(), isUnlimitedQueue(), getResmgrUrl());
}
protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
@@ -61,10 +60,9 @@ public class ThreadPoolWorkflowEngineFac
.getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
}
- protected WorkflowLifecycleManager getWorkflowLifecycle() {
+ protected URL getResmgrUrl() {
try {
- return new WorkflowLifecycleManager(PathUtils.replaceEnvVariables(System
- .getProperty(LIFECYCLES_FILE_PATH_PROPERTY)));
+ return new URL(System.getProperty(RESMGR_URL_PROPERTY));
} catch (Exception e) {
e.printStackTrace();
return null;
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java?rev=1385381&r1=1385380&r2=1385381&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
Sun Sep 16 21:40:52 2012
@@ -20,7 +20,6 @@ package org.apache.oodt.cas.workflow.eng
import java.net.URL;
//OODT imports
-import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
import org.apache.oodt.cas.workflow.structs.Workflow;
@@ -38,10 +37,6 @@ public interface WorkflowEngine {
public static final String X_POINT_ID = WorkflowEngine.class.getName();
- /**
- * @param runner The Runner to use when executing TaskInstances.
- */
- public void setEngineRunner(EngineRunner runner);
/**
* <p>