Author: mattmann
Date: Sun Sep 16 19:43:07 2012
New Revision: 1385358
URL: http://svn.apache.org/viewvc?rev=1385358&view=rev
Log:
- OODT-495 WIP: test Workflow (hello world/goodbye world) now runs to
completion!
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/TaskProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycleStage.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowState.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1385358&r1=1385357&r2=1385358&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
Sun Sep 16 19:43:07 2012
@@ -106,10 +106,17 @@ public class TaskQuerier implements Runn
processor.getLifecycleManager());
WorkflowLifecycle lifecycle = helper
.getLifecycleForProcessor(processor);
+
+ LOG.log(Level.FINE, "TaskQuerier: dispositioning processor with id: ["
+ + processor.getWorkflowInstance().getId() + "]: state: "
+ + processor.getWorkflowInstance().getState());
+
if (!(processor.getWorkflowInstance().getState().getCategory()
.getName().equals("done") || processor.getWorkflowInstance()
- .getState().getCategory().getName().equals("holding")) &&
-
!processor.getWorkflowInstance().getState().getName().equals("Executing")) {
+ .getState().getCategory().getName().equals("holding"))
+ && !processor.getWorkflowInstance().getState().getName()
+ .equals("Executing") && processor.getRunnableWorkflowProcessors()
!= null
+ && processor.getRunnableWorkflowProcessors().size() > 0) {
for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
WorkflowState state = lifecycle.createState("Executing", "running",
"Added to Runnable queue");
@@ -120,7 +127,7 @@ public class TaskQuerier implements Runn
processorsToRun.add(tp);
}
- if(processorsToRun != null && processorsToRun.size() > 1){
+ if (processorsToRun != null && processorsToRun.size() > 1) {
prioritizer.sort(processorsToRun);
}
@@ -131,9 +138,11 @@ public class TaskQuerier implements Runn
} else {
// simply call nextState and persist it
- LOG.log(Level.FINE, "Processor for workflow instance: ["
- + processor.getWorkflowInstance().getId()
- + "] not ready to Execute or already Executing: advancing it to
next state.");
+ LOG.log(
+ Level.FINE,
+ "Processor for workflow instance: ["
+ + processor.getWorkflowInstance().getId()
+ + "] not ready to Execute or already Executing: advancing it
to next state.");
processor.nextState();
persist(processor.getWorkflowInstance());
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/TaskProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/TaskProcessor.java?rev=1385358&r1=1385357&r2=1385358&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/TaskProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/TaskProcessor.java
Sun Sep 16 19:43:07 2012
@@ -91,13 +91,8 @@ public class TaskProcessor extends Workf
.getTimeInMillis()) / 1000) / 60;
if (elapsedTime >= requiredBlockTimeElapse)
tps.add(this);
- } else if (((this.getWorkflowInstance().getState().getName()
- .equals("Queued") || this.getWorkflowInstance().getState()
- .getName().equals("Loaded") || this.getWorkflowInstance().getState()
- .getName().equals("Executing"))
- && this.passedPreConditions())
- || this.getWorkflowInstance().getState().getName()
- .equals("PreConditionSuccess")) {
+ } else if (this.isAnyState("Loaded", "Queued", "PreConditionSuccess") &&
+ !this.isAnyState("Executing") && this.passedPreConditions()){
tps.add(this);
}
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java?rev=1385358&r1=1385357&r2=1385358&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
Sun Sep 16 19:43:07 2012
@@ -50,14 +50,19 @@ public abstract class WorkflowProcessor
private WorkflowInstance workflowInstance;
private WorkflowProcessor preConditions;
private WorkflowProcessor postConditions;
- private List<String> excusedSubProcessorIds; //FIXME: read this in
PackagedRepo: flow through instance
+ private List<String> excusedSubProcessorIds; // FIXME: read this in
+ // PackagedRepo: flow through
+ // instance
private List<WorkflowProcessor> subProcessors;
private List<WorkflowProcessorListener> listeners;
- private int minReqSuccessfulSubProcessors; //FIXME: read this in
PackagedRepo: flow through instance
+ private int minReqSuccessfulSubProcessors; // FIXME: read this in
+ // PackagedRepo: flow through
+ // instance
protected WorkflowLifecycleManager lifecycleManager;
protected WorkflowProcessorHelper helper;
-
- public WorkflowProcessor(WorkflowLifecycleManager lifecycleManager,
WorkflowInstance workflowInstance){
+
+ public WorkflowProcessor(WorkflowLifecycleManager lifecycleManager,
+ WorkflowInstance workflowInstance) {
this.subProcessors = new Vector<WorkflowProcessor>();
this.listeners = new Vector<WorkflowProcessorListener>();
this.excusedSubProcessorIds = new Vector<String>();
@@ -65,7 +70,9 @@ public abstract class WorkflowProcessor
this.lifecycleManager = lifecycleManager;
this.workflowInstance = workflowInstance;
this.helper = new WorkflowProcessorHelper(lifecycleManager);
- WorkflowState initState =
helper.getLifecycleForProcessor(this).createState("Null", "initial", "Instance
created by workflow processor.");
+ WorkflowState initState = helper.getLifecycleForProcessor(this)
+ .createState("Null", "initial",
+ "Instance created by workflow processor.");
this.workflowInstance.setState(initState);
}
@@ -143,7 +150,7 @@ public abstract class WorkflowProcessor
public void setMinReqSuccessfulSubProcessors(int
minReqSuccessfulSubProcessors) {
this.minReqSuccessfulSubProcessors = minReqSuccessfulSubProcessors;
}
-
+
/**
* @return the lifecycleManager
*/
@@ -152,12 +159,12 @@ public abstract class WorkflowProcessor
}
/**
- * @param lifecycleManager the lifecycleManager to set
+ * @param lifecycleManager
+ * the lifecycleManager to set
*/
public void setLifecycleManager(WorkflowLifecycleManager lifecycleManager) {
this.lifecycleManager = lifecycleManager;
- }
-
+ }
/**
* @return the preConditions
@@ -166,15 +173,14 @@ public abstract class WorkflowProcessor
return preConditions;
}
-
/**
- * @param preConditions the preConditions to set
+ * @param preConditions
+ * the preConditions to set
*/
public void setPreConditions(WorkflowProcessor preConditions) {
this.preConditions = preConditions;
}
-
/**
* @return the postConditions
*/
@@ -182,13 +188,13 @@ public abstract class WorkflowProcessor
return postConditions;
}
-
/**
- * @param postConditions the postConditions to set
+ * @param postConditions
+ * the postConditions to set
*/
public void setPostConditions(WorkflowProcessor postConditions) {
this.postConditions = postConditions;
- }
+ }
/*
* (non-Javadoc)
@@ -197,8 +203,8 @@ public abstract class WorkflowProcessor
*/
@Override
public int compareTo(WorkflowProcessor workflowProcessor) {
- return this.getWorkflowInstance().getPriority().
- compareTo(workflowProcessor.getWorkflowInstance().getPriority());
+ return this.getWorkflowInstance().getPriority()
+ .compareTo(workflowProcessor.getWorkflowInstance().getPriority());
}
/*
@@ -245,10 +251,73 @@ public abstract class WorkflowProcessor
return runnableTasks;
}
+ /**
+ * Advances this WorkflowProcessor to its next {@link WorkflowState}.
+ */
+ public void nextState() {
+ if (this.workflowInstance != null
+ && this.workflowInstance.getState() != null) {
+ WorkflowState currState = this.workflowInstance.getState();
+ WorkflowState nextState = null;
+ if (currState.getName().equals("Null")) {
+ nextState = this.helper.getLifecycleForProcessor(this).createState(
+ "Loaded",
+ "initial",
+ "Workflow Processor: nextState: " + "loading workflow instance: ["
+ + this.workflowInstance.getId() + "]");
+ } else if (currState.getName().equals("Loaded")) {
+ nextState = this.helper.getLifecycleForProcessor(this).createState(
+ "Queued",
+ "initial",
+ "Workflow Processor: nextState: " + "queueing instance: ["
+ + this.workflowInstance.getId() + "]");
+ } else if (currState.getName().equals("Queued")) {
+ if (!this.passedPreConditions()) {
+ nextState = this.helper.getLifecycleForProcessor(this).createState(
+ "PreConditionEval",
+ "running",
+ "Workflow Processor: nextState: "
+ + "running preconditiosn for workflow instance: ["
+ + this.workflowInstance.getId() + "]");
+ } else {
+ if (this.getRunnableWorkflowProcessors() != null
+ && this.getRunnableWorkflowProcessors().size() == 0
+ && this.passedPostConditions()) {
+ nextState = this.helper.getLifecycleForProcessor(this).createState(
+ "Success",
+ "done",
+ "Workflow Processor: nextState: " + "workflow instance: ["
+ + this.workflowInstance.getId()
+ + "] completed successfully");
+ }
+ }
+ } else if (currState.getName().equals("Executing")) {
+ nextState = this.helper.getLifecycleForProcessor(this).createState(
+ "Success",
+ "done",
+ "Workflow Processor: nextState: " + "workflow instance: ["
+ + this.workflowInstance.getId() + "] completed successfully");
+ }
+
+ if (nextState != null) {
+ this.workflowInstance.setState(nextState);
+ }
+
+ } else {
+ this.workflowInstance.setState(helper.getLifecycleForProcessor(this)
+ .createState(
+ "Unknown",
+ "holding",
+ "The Workflow Processor for instance : ["
+ + this.getWorkflowInstance().getId() + "] "
+ + "had a null state"));
+ }
+ }
+
protected boolean passedPreConditions() {
if (this.getPreConditions() != null) {
- return this.getPreConditions().getWorkflowInstance().
- getState().getName().equals("Success");
+ return this.getPreConditions().getWorkflowInstance().getState().getName()
+ .equals("Success");
} else {
return true;
}
@@ -256,8 +325,8 @@ public abstract class WorkflowProcessor
protected boolean passedPostConditions() {
if (this.getPostConditions() != null) {
- return this.getPostConditions().getWorkflowInstance().
- getState().getName().equals("Success");
+ return this.getPostConditions().getWorkflowInstance().getState()
+ .getName().equals("Success");
} else {
return true;
}
@@ -280,12 +349,13 @@ public abstract class WorkflowProcessor
*/
protected WorkflowState isDone() {
if (this.helper.containsCategory(this.getSubProcessors(), "done")) {
- List<WorkflowProcessor> failedSubProcessors =
this.helper.getWorkflowProcessorsByState(
- this.getSubProcessors(), "Failure");
+ List<WorkflowProcessor> failedSubProcessors = this.helper
+ .getWorkflowProcessorsByState(this.getSubProcessors(), "Failure");
if (this.minReqSuccessfulSubProcessors != -1
&& failedSubProcessors.size() > (this.getSubProcessors().size() -
this.minReqSuccessfulSubProcessors))
- return
lifecycleManager.getDefaultLifecycle().createState("ResultsFailure",
- "results", "More than the allowed number of sub-processors
failed");
+ return lifecycleManager.getDefaultLifecycle().createState(
+ "ResultsFailure", "results",
+ "More than the allowed number of sub-processors failed");
for (WorkflowProcessor subProcessor : failedSubProcessors) {
if (!this.getExcusedSubProcessorIds().contains(
subProcessor.getWorkflowInstance().getId())) {
@@ -296,7 +366,8 @@ public abstract class WorkflowProcessor
+ "] failed.");
}
}
- if (this.helper.allProcessorsSameCategory(this.getSubProcessors(),
"done"))
+ if (this.helper
+ .allProcessorsSameCategory(this.getSubProcessors(), "done"))
return lifecycleManager.getDefaultLifecycle().createState(
"ResultsSuccess",
"results",
@@ -309,53 +380,16 @@ public abstract class WorkflowProcessor
"All sub-processors for Workflow Processor handling workflow id: ["
+ workflowInstance.getId() + "] are " + "not complete");
}
-
-
- public void nextState(){
- if(this.workflowInstance != null && this.workflowInstance.getState() !=
null){
- WorkflowState currState = this.workflowInstance.getState();
- WorkflowState nextState = null;
- if(currState.getName().equals("Null")){
- nextState = this.helper.getLifecycleForProcessor(this)
- .createState("Loaded", "initial", "Workflow Processor: nextState: "
+
- "loading workflow instance:
["+this.workflowInstance.getId()+"]");
- }
- else if(currState.getName().equals("Loaded")){
- nextState = this.helper.getLifecycleForProcessor(this)
- .createState("Queued", "initial", "Workflow Processor: nextState: " +
- "queueing instance: ["+this.workflowInstance.getId()+"]");
- }
- else if(currState.getName().equals("Queued")){
- if(!this.passedPreConditions()){
- nextState = this.helper.getLifecycleForProcessor(this)
- .createState("PreConditionEval", "running", "Workflow Processor:
nextState: " +
- "running preconditiosn for workflow instance:
["+this.workflowInstance.getId()+"]");
- }
- }
- else if(currState.getName().equals("Executing")){
- nextState = this.helper.getLifecycleForProcessor(this)
- .createState("Success", "done", "Workflow Processor: nextState: " +
- "workflow instance: ["+this.workflowInstance.getId()+"] completed
successfully");
- }
-
- if(nextState != null){
- this.workflowInstance.setState(nextState);
- }
- else{
- nextState = this.helper.getLifecycleForProcessor(this)
- .createState("Unknown", "holding", "Unable to determine next
state " +
- "for workflow processor for instance:
["+this.workflowInstance.getId()+"]: " +
- "current state:
["+currState.getName()+"]");
- }
- this.workflowInstance.setState(nextState);
- }
- else{
-
this.workflowInstance.setState(helper.getLifecycleForProcessor(this).createState("Unknown",
- "holding", "The Workflow Processor for instance :
["+this.getWorkflowInstance().getId()+"] " +
- "had a null state"));
+
+ protected boolean isAnyState(String... states) {
+ for (String state : states) {
+ if (this.getWorkflowInstance().getState().getName().equals(state)) {
+ return true;
+ }
}
- }
+ return false;
+ }
/**
* This is the core method of the WorkflowProcessor class in the new Wengine
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycleStage.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycleStage.java?rev=1385358&r1=1385357&r2=1385358&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycleStage.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycleStage.java
Sun Sep 16 19:43:07 2012
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.oodt.cas.workflow.lifecycle;
//JDK imports
@@ -23,110 +22,121 @@ import java.util.List;
import java.util.Vector;
/**
+ *
+ * A particular step (or Stage) in a {@link WorkflowLifecycle}
+ *
* @author mattmann
* @version $Revision$
*
- * <p>
- * A particular step (or Stage) in a {@link WorkflowLifecycle}
- * </p>.
*/
public class WorkflowLifecycleStage {
- private String name;
+ private String name;
- private int order;
+ private int order;
- private List<WorkflowState> states;
+ private List<WorkflowState> states;
- /**
- * Default Constructor.
- *
- */
- public WorkflowLifecycleStage() {
- states = new Vector<WorkflowState>();
- }
-
- /**
- * Constructs a new WorkflowLifecycleSage with the given parameters.
- *
- * @param name
- * The name of the WorkflowLifeCycleStage.
- * @param states
- * The {@link List} of String states that are part of this
- * particular stage.
- *
- * @param order
- * The ordering of this State in a {@List} of States that make
up
- * a {@link WorkflowLifeCycle}.
- */
- public WorkflowLifecycleStage(String name, List<WorkflowState> states, int
order) {
- this.name = name;
- this.states = states;
- this.order = order;
- }
-
- /**
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * @param name
- * the name to set
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * @return the states
- */
- public List<WorkflowState> getStates() {
- return states;
- }
-
- /**
- * @param states
- * the states to set
- */
- public void setStates(List<WorkflowState> states) {
- this.states = states;
- }
-
- /**
- * @return the order
- */
- public int getOrder() {
- return order;
- }
-
- /**
- * @param order
- * the order to set
- */
- public void setOrder(int order) {
- this.order = order;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- public int hashCode() {
- return this.name.hashCode() + new Integer(this.order).hashCode();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object stage) {
- return this.name.equals(((WorkflowLifecycleStage)stage).getName());
- }
-
-
+ /**
+ * Default Constructor.
+ *
+ */
+ public WorkflowLifecycleStage() {
+ states = new Vector<WorkflowState>();
+ }
+
+ /**
+ * Constructs a new WorkflowLifecycleSage with the given parameters.
+ *
+ * @param name
+ * The name of the WorkflowLifeCycleStage.
+ * @param states
+ * The {@link List} of String states that are part of this
particular
+ * stage.
+ *
+ * @param order
+ * The ordering of this State in a {@List} of States that make
+ * up a {@link WorkflowLifeCycle}.
+ */
+ public WorkflowLifecycleStage(String name, List<WorkflowState> states,
+ int order) {
+ this.name = name;
+ this.states = states;
+ this.order = order;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @param name
+ * the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the states
+ */
+ public List<WorkflowState> getStates() {
+ return states;
+ }
+
+ /**
+ * @param states
+ * the states to set
+ */
+ public void setStates(List<WorkflowState> states) {
+ this.states = states;
+ }
+
+ /**
+ * @return the order
+ */
+ public int getOrder() {
+ return order;
+ }
+
+ /**
+ * @param order
+ * the order to set
+ */
+ public void setOrder(int order) {
+ this.order = order;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return this.name.hashCode() + new Integer(this.order).hashCode();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object stage) {
+ return this.name.equals(((WorkflowLifecycleStage) stage).getName());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.name;
+ }
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowState.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowState.java?rev=1385358&r1=1385357&r2=1385358&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowState.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowState.java
Sun Sep 16 19:43:07 2012
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -34,14 +34,12 @@ public class WorkflowState {
private String name;
private String description;
private String message;
- private Vector<WorkflowState> subStates;
private Date startTime;
private WorkflowLifecycleStage category;
private WorkflowState prevState;
public WorkflowState(){
- this.subStates = new Vector<WorkflowState>();
this.startTime = null;
this.name = null;
this.description = null;
@@ -80,7 +78,7 @@ public class WorkflowState {
}
public String toString() {
- return this.getName() + " : " + this.getMessage();
+ return this.getName() + " ["+this.getCategory()+"] : " +
this.getMessage();
}
/**
@@ -112,20 +110,6 @@ public class WorkflowState {
}
/**
- * @return the subStates
- */
- public Vector<WorkflowState> getSubStates() {
- return subStates;
- }
-
- /**
- * @param subStates the subStates to set
- */
- public void setSubStates(Vector<WorkflowState> subStates) {
- this.subStates = subStates;
- }
-
- /**
* @return the category
*/
public WorkflowLifecycleStage getCategory() {