Author: mattmann
Date: Thu Sep 20 06:35:44 2012
New Revision: 1387871
URL: http://svn.apache.org/viewvc?rev=1387871&view=rev
Log:
- fix for OODT-496: Convert EngineRunner interface to take TaskProcessor
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
Modified: oodt/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Thu Sep 20 06:35:44 2012
@@ -4,6 +4,8 @@ Apache OODT Change Log
Release 0.5
--------------------------------------------
+* OODT-496: Convert EngineRunner interface to take TaskProcessor (mattmann)
+
* OODT-505: Remove synchronous Runner (mattmann)
* OODT-498: Overwrite and bring back 0.3 ThreadPoolWorkflowEngine plus
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
Thu Sep 20 06:35:44 2012
@@ -115,7 +115,7 @@ public class TaskQuerier implements Runn
&& !processor.isAnyState("Executing")
&& processor.getRunnableWorkflowProcessors().size() > 0) {
for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
- WorkflowState state = lifecycle.createState("Executing", "running",
+ WorkflowState state = lifecycle.createState("WaitingOnResources",
"waiting",
"Added to Runnable queue");
tp.getWorkflowInstance().setState(state);
persist(tp.getWorkflowInstance());
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
Thu Sep 20 06:35:44 2012
@@ -78,17 +78,14 @@ public class TaskRunner implements Runna
*/
@Override
public void run() {
- WorkflowTask nextTask = null;
TaskProcessor nextTaskProcessor = null;
while (running) {
nextTaskProcessor = taskQuerier.getNext();
- nextTask = nextTaskProcessor != null ?
extractTaskFromProcessor(nextTaskProcessor)
- : null;
try {
- if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
- runner.execute(nextTask,
nextTaskProcessor.getWorkflowInstance().getSharedContext());
+ if (nextTaskProcessor != null &&
runner.hasOpenSlots(nextTaskProcessor)) {
+ runner.execute(nextTaskProcessor);
}
} catch (Exception e) {
e.printStackTrace();
@@ -96,9 +93,8 @@ public class TaskRunner implements Runna
Level.SEVERE,
"Engine failed while submitting jobs to its runner : "
+ e.getMessage(), e);
- if (nextTask != null) {
+ if (nextTaskProcessor != null) {
this.flagProcessorAsFailed(nextTaskProcessor, e.getMessage());
- nextTask = null;
nextTaskProcessor = null;
}
}
@@ -140,6 +136,7 @@ public class TaskRunner implements Runna
.getDefaultLifecycle()
.createState("Failure", "done",
"Failed while submitting job to Runner : " + msg));
+ //TODO: persist me?
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessor.java
Thu Sep 20 06:35:44 2012
@@ -281,9 +281,7 @@ public abstract class WorkflowProcessor
+ "running preconditiosn for workflow instance: ["
+ this.workflowInstance.getId() + "]");
} else {
- if (this.getRunnableWorkflowProcessors() != null
- && this.getRunnableWorkflowProcessors().size() == 0
- && this.passedPostConditions()) {
+ if (this.isDone().getName().equals("ResultsSuccess")) {
nextState = this.helper.getLifecycleForProcessor(this).createState(
"Success",
"done",
@@ -293,11 +291,13 @@ public abstract class WorkflowProcessor
}
}
} else if (currState.getName().equals("Executing")) {
+ if(this.isDone().getName().equals("ResultsSuccess")){
nextState = this.helper.getLifecycleForProcessor(this).createState(
"Success",
"done",
"Workflow Processor: nextState: " + "workflow instance: ["
+ this.workflowInstance.getId() + "] completed successfully");
+ }
}
if (nextState != null) {
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
Thu Sep 20 06:35:44 2012
@@ -36,6 +36,7 @@ import org.apache.oodt.cas.workflow.repo
import org.apache.oodt.cas.workflow.structs.Graph;
import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
@@ -56,11 +57,11 @@ public class WorkflowProcessorQueue {
.getLogger(WorkflowProcessorQueue.class.getName());
private WorkflowInstanceRepository repo;
-
+
private WorkflowRepository modelRepo;
private WorkflowLifecycleManager lifecycle;
-
+
private Map<String, WorkflowProcessor> processorCache;
public WorkflowProcessorQueue(WorkflowInstanceRepository repo,
@@ -88,11 +89,11 @@ public class WorkflowProcessorQueue {
}
List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>(
- page.getPageWorkflows() != null ? page.getPageWorkflows().size():0);
+ page.getPageWorkflows() != null ? page.getPageWorkflows().size() : 0);
for (WorkflowInstance inst : (List<WorkflowInstance>) (List<?>) page
.getPageWorkflows()) {
- if(!inst.getState().getCategory().getName().equals("done")){
- processors.add(fromWorkflowInstance(inst));
+ if (!inst.getState().getCategory().getName().equals("done")) {
+ processors.add(fromWorkflowInstance(inst));
}
}
@@ -101,82 +102,90 @@ public class WorkflowProcessorQueue {
private WorkflowProcessor fromWorkflowInstance(WorkflowInstance inst) {
WorkflowProcessor processor = null;
- if(processorCache.containsKey(inst.getId())){
+ if (processorCache.containsKey(inst.getId())) {
return processorCache.get(inst.getId());
- }
- else{
+ } else {
if (inst.getParentChildWorkflow().getTasks() != null
&& inst.getParentChildWorkflow().getTasks().size() > 1) {
- processor = new SequentialProcessor(lifecycle, inst);
- WorkflowState seqProcessorState =
- getLifecycle(inst.getParentChildWorkflow()).
- createState("Loaded", "initial", "Sequential Workflow instance
with id: ["
- + inst.getId()+"] loaded by processor queue.");
+ processor = getProcessorFromInstanceGraph(inst, lifecycle);
+ WorkflowState seqProcessorState = getLifecycle(
+ inst.getParentChildWorkflow()).createState(
+ "Loaded",
+ "initial",
+ "Sequential Workflow instance with id: [" + inst.getId()
+ + "] loaded by processor queue.");
inst.setState(seqProcessorState);
- persist(inst);
-
+ persist(inst);
+
+ for (WorkflowCondition cond : inst.getParentChildWorkflow()
+ .getPreConditions()) {
+
+ }
+
for (WorkflowTask task : inst.getParentChildWorkflow().getTasks()) {
WorkflowInstance instance = new WorkflowInstance();
- WorkflowState taskWorkflowState =
- lifecycle.getDefaultLifecycle().createState("Null", "initial",
- "Sub Task Workflow created by Workflow Processor Queue for
workflow instance: " +
- "["+inst.getId()+"]");
+ WorkflowState taskWorkflowState = lifecycle.getDefaultLifecycle()
+ .createState(
+ "Null",
+ "initial",
+ "Sub Task Workflow created by Workflow Processor Queue for
workflow instance: "
+ + "[" + inst.getId() + "]");
instance.setState(taskWorkflowState);
instance.setPriority(inst.getPriority());
instance.setCurrentTaskId(task.getTaskId());
ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
String taskWorkflowId = UUID.randomUUID().toString();
- workflow.setId("task-workflow-"+ taskWorkflowId);
- workflow.setName("Task Workflow-"+task.getTaskName());
+ workflow.setId("task-workflow-" + taskWorkflowId);
+ workflow.setName("Task Workflow-" + task.getTaskName());
workflow.getTasks().add(task);
workflow.getGraph().setTask(task);
instance.setId(taskWorkflowId);
instance.setParentChildWorkflow(workflow);
- this.addToModelRepo(workflow);
+ this.addToModelRepo(workflow);
persist(inst);
WorkflowProcessor subProcessor = fromWorkflowInstance(instance);
- processor.getSubProcessors().add(subProcessor);
- }
- }
- else{
+ processor.getSubProcessors().add(subProcessor);
+ }
+ } else {
processor = new TaskProcessor(lifecycle, inst);
- WorkflowState taskProcessorState =
- getLifecycle(inst.getParentChildWorkflow()).
- createState("Loaded", "initial", "Task Workflow instance with id:
["
- + inst.getId()+"] loaded by processor queue.");
- inst.setState(taskProcessorState);
+ WorkflowState taskProcessorState = getLifecycle(
+ inst.getParentChildWorkflow()).createState(
+ "Loaded",
+ "initial",
+ "Task Workflow instance with id: [" + inst.getId()
+ + "] loaded by processor queue.");
+ inst.setState(taskProcessorState);
persist(inst);
}
-
- synchronized(processorCache){
+
+ synchronized (processorCache) {
processorCache.put(inst.getId(), processor);
}
- return processor;
+ return processor;
}
}
-
- private void addToModelRepo(Workflow workflow){
- if(modelRepo != null){
+
+ private void addToModelRepo(Workflow workflow) {
+ if (modelRepo != null) {
try {
modelRepo.addWorkflow(workflow);
} catch (RepositoryException e) {
e.printStackTrace();
}
- }
+ }
}
-
- private void persist(WorkflowInstance instance){
+
+ private void persist(WorkflowInstance instance) {
try {
this.repo.updateWorkflowInstance(instance);
} catch (Exception e) {
e.printStackTrace();
- LOG.log(
- Level.WARNING,
+ LOG.log(Level.WARNING,
"Unable to update workflow instance: [" + instance.getId()
- + "] with status: [" + instance.getState().getName() + "]:
Message: "
- + e.getMessage());
- }
+ + "] with status: [" + instance.getState().getName()
+ + "]: Message: " + e.getMessage());
+ }
}
private WorkflowLifecycle getLifecycle(Workflow workflow) {
@@ -184,4 +193,15 @@ public class WorkflowProcessorQueue {
.getLifecycleForWorkflow(workflow) : lifecycle.getDefaultLifecycle();
}
+ private WorkflowProcessor getProcessorFromInstanceGraph(
+ WorkflowInstance instance, WorkflowLifecycleManager lifecycle) {
+ Graph graph = instance.getParentChildWorkflow().getGraph();
+ if (graph != null && graph.getExecutionType() != null
+ && graph.getExecutionType().equals("sequential")) {
+ return new SequentialProcessor(lifecycle, instance);
+ } else {
+ return new ParallelProcessor(lifecycle, instance);
+ }
+ }
+
}
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java?rev=1387871&view=auto
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
(added)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AbstractEngineRunnerBase.java
Thu Sep 20 06:35:44 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * An abstract base class providing helper functionality to persist
+ * {@link WorkflowInstance}s, to get {@link WorkflowLifecycle}s from underlying
+ * {@link TaskProcessor}s, and to get {@link WorkflowTask}s from the underlying
+ * {@link TaskProcessor}.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public abstract class AbstractEngineRunnerBase extends EngineRunner {
+
+ protected final WorkflowInstanceRepository instRep;
+
+ private static final Logger LOG = Logger
+ .getLogger(AbstractEngineRunnerBase.class.getName());
+
+ /**
+ * Creates a new AbsractEngineRunnerBase with the provided
+ * {@link WorkflowInstanceRepository}.
+ *
+ * @param instRep
+ * The {@link WorkflowInstanceRepository} to use to persist
+ * {@link TaskProcessor} {@link WorkflowInstance} information.
+ */
+ public AbstractEngineRunnerBase(WorkflowInstanceRepository instRep) {
+ this.instRep = instRep;
+ }
+
+ protected WorkflowTask getTaskFromProcessor(TaskProcessor taskProcessor) {
+ if (taskProcessor.getWorkflowInstance() != null
+ && taskProcessor.getWorkflowInstance().getParentChildWorkflow() != null
+ && taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+ .getGraph() != null) {
+ if (taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+ .getGraph().getTask() != null) {
+ return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+ .getGraph().getTask();
+ } else
+ return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+ .getTasks().get(0);
+ } else
+ return taskProcessor.getWorkflowInstance().getParentChildWorkflow()
+ .getTasks().get(0);
+ }
+
+ protected WorkflowLifecycle getLifecycle(TaskProcessor taskProcessor) {
+ return taskProcessor.getLifecycleManager().getDefaultLifecycle();
+ }
+
+ protected void persist(WorkflowInstance instance) {
+ try {
+ instRep.updateWorkflowInstance(instance);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unabled to persist workflow instance: ["
+ + instance.getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
Thu Sep 20 06:35:44 2012
@@ -27,18 +27,21 @@ import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
-import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
/**
- * Runs a local version of a {@link WorkflowTask} asynchronously.
- *
+ * Runs a local version of a {@link TaskProcessor} asynchronously.
+ *
* @author mattmann (Chris Mattmann)
* @author bfoster (Brian Foster)
*/
-public class AsynchronousLocalEngineRunner extends EngineRunner {
+public class AsynchronousLocalEngineRunner extends AbstractEngineRunnerBase {
private static final Logger LOG = Logger
.getLogger(AsynchronousLocalEngineRunner.class.getName());
@@ -49,44 +52,58 @@ public class AsynchronousLocalEngineRunn
private final Map<String, Thread> workerMap;
public AsynchronousLocalEngineRunner() {
- this(DEFAULT_NUM_THREADS);
+ this(DEFAULT_NUM_THREADS, null);
}
- public AsynchronousLocalEngineRunner(int numThreads) {
- this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
+ public AsynchronousLocalEngineRunner(int numThreads,
+ WorkflowInstanceRepository instRep) {
+ super(instRep);
+ this.executor = Executors.newFixedThreadPool(numThreads);
this.workerMap = new HashMap<String, Thread>();
}
/*
* (non-Javadoc)
- *
+ *
* @see
- * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
- * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+ * org.apache.oodt.cas.workflow.engine.runner.EngineRunner#execute(org.apache
+ * .oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
- public void execute(final WorkflowTask workflowTask,
- final Metadata dynMetadata) throws Exception {
+ public void execute(final TaskProcessor taskProcessor) throws Exception {
Thread worker = new Thread() {
@Override
public void run() {
+ WorkflowLifecycle lifecycle = getLifecycle(taskProcessor);
+ WorkflowTask workflowTask = getTaskFromProcessor(taskProcessor);
WorkflowTaskInstance inst = GenericWorkflowObjectFactory
.getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
try {
- inst.run(dynMetadata, workflowTask.getTaskConfig());
+ inst.run(taskProcessor.getWorkflowInstance().getSharedContext(),
+ workflowTask.getTaskConfig());
+ String msg = "Task: [" + workflowTask.getTaskName()
+ + "] for instance id: ["
+ + taskProcessor.getWorkflowInstance().getId()
+ + "] completed successfully";
+ WorkflowState state = lifecycle.createState("ExecutionComplete",
"transition", msg);
+ taskProcessor.getWorkflowInstance().setState(state);
+ persist(taskProcessor.getWorkflowInstance());
} catch (Exception e) {
e.printStackTrace();
- LOG.log(Level.WARNING,
- "Exception executing task: [" + workflowTask.getTaskName()
- + "]: Message: " + e.getMessage());
+ String msg = "Exception executing task: ["
+ + workflowTask.getTaskName() + "]: Message: " + e.getMessage();
+ LOG.log(Level.WARNING, msg);
+ WorkflowState state = lifecycle.createState("Failure", "done", msg);
+ taskProcessor.getWorkflowInstance().setState(state);
+ persist(taskProcessor.getWorkflowInstance());
}
}
/*
* (non-Javadoc)
- *
+ *
* @see java.lang.Thread#interrupt()
*/
@SuppressWarnings("deprecation")
@@ -108,7 +125,7 @@ public class AsynchronousLocalEngineRunn
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
*/
@Override
@@ -122,11 +139,15 @@ public class AsynchronousLocalEngineRunn
}
- /* (non-Javadoc)
- * @see
org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.runner.EngineRunner#hasOpenSlots(org
+ * .apache.oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
- public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+ public boolean hasOpenSlots(TaskProcessor taskProcessor) throws Exception {
// TODO Auto-generated method stub
return true;
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunnerFactory.java
Thu Sep 20 06:35:44 2012
@@ -16,29 +16,44 @@
*/
package org.apache.oodt.cas.workflow.engine.runner;
+//OODT imports
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
/**
- * A {@link EngineRunnerFactory} which creates {@link
AsynchronousLocalEngineRunner}s.
- *
+ * A {@link EngineRunnerFactory} which creates
+ * {@link AsynchronousLocalEngineRunner}s.
+ *
* @author bfoster (Brian Foster)
+ * @author mattmann (Chris Mattmann)
*/
public class AsynchronousLocalEngineRunnerFactory implements
- EngineRunnerFactory {
-
- private static final String NUM_THREADS_PROPERTY =
"org.apache.oodt.cas.workflow.wengine.asynchronous.runner.num.threads";
+ EngineRunnerFactory {
- private int numThreads;
+ private static final String NUM_THREADS_PROPERTY =
"org.apache.oodt.cas.workflow.wengine.asynchronous.runner.num.threads";
- public AsynchronousLocalEngineRunnerFactory() {
- numThreads = Integer.getInteger(NUM_THREADS_PROPERTY,
- AsynchronousLocalEngineRunner.DEFAULT_NUM_THREADS);
- }
+ private static final String INSTANCE_REPO_FACTORY_PROPERTY =
"workflow.engine.instanceRep.factory";
- @Override
- public AsynchronousLocalEngineRunner createEngineRunner() {
- return new AsynchronousLocalEngineRunner(numThreads);
- }
+ private int numThreads;
- public void setNumThreads(int numThreads) {
- this.numThreads = numThreads;
- }
+ public AsynchronousLocalEngineRunnerFactory() {
+ numThreads = Integer.getInteger(NUM_THREADS_PROPERTY,
+ AsynchronousLocalEngineRunner.DEFAULT_NUM_THREADS);
+ }
+
+ @Override
+ public AsynchronousLocalEngineRunner createEngineRunner() {
+ return new AsynchronousLocalEngineRunner(numThreads,
+ getWorkflowInstanceRepository());
+ }
+
+ public void setNumThreads(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+ return GenericWorkflowObjectFactory
+ .getWorkflowInstanceRepositoryFromClassName(System
+ .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+ }
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
Thu Sep 20 06:35:44 2012
@@ -18,6 +18,7 @@ package org.apache.oodt.cas.workflow.eng
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
/**
@@ -33,19 +34,17 @@ import org.apache.oodt.cas.workflow.stru
public abstract class EngineRunner {
/**
- * Executes a {@link WorkflowTask} on an execution substrate. Ideally there
+ * Executes a {@link TaskProcessor} on an execution substrate. Ideally there
* will only ever be two of these substrates, one for local execution, and
* another for communication with the Resource Manager.
*
- * @param workflowTask
- * The model of the {@link WorkflowTask} to instantiate and execute.
- * @param dynMetadata
- * The dynamic {@link Metadata} passed to this {@link WorkflowTask}.
+ * @param taskProcessor
+ * The {@link TaskProcessor} to instantiate and execute.
*
* @throws Exception
* If any error occurs.
*/
- public abstract void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+ public abstract void execute(TaskProcessor taskProcessor)
throws Exception;
/**
@@ -59,12 +58,12 @@ public abstract class EngineRunner {
/**
* Decides whether or not there are available slots within this runner
- * to execute the provided {@link WorkflowTask}.
+ * to execute the provided {@link TaskProcessor}.
*
- * @param workflowTask The {@link WorkflowTask} to execute.
+ * @param workflowTask The {@link TaskProcessor} to execute.
* @return True if there is an open slot, false otherwise.
* @throws Exception If any error occurs.
*/
- public abstract boolean hasOpenSlots(WorkflowTask workflowTask) throws
Exception;
+ public abstract boolean hasOpenSlots(TaskProcessor taskProcessor) throws
Exception;
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
Thu Sep 20 06:35:44 2012
@@ -23,10 +23,11 @@ import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
-import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.resource.structs.Job;
import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
import org.apache.oodt.cas.workflow.structs.TaskJobInput;
import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
@@ -40,7 +41,7 @@ import org.apache.oodt.cas.workflow.stru
* @version $Revision$
*
*/
-public class ResourceRunner extends EngineRunner implements CoreMetKeys,
+public class ResourceRunner extends AbstractEngineRunnerBase implements
CoreMetKeys,
WorkflowStatus {
private static final Logger LOG = Logger.getLogger(ResourceRunner.class
@@ -52,21 +53,18 @@ public class ResourceRunner extends Engi
private String currentJobId;
- public ResourceRunner(URL resUrl) {
+ public ResourceRunner(URL resUrl, WorkflowInstanceRepository instRep) {
+ super(instRep);
this.rClient = new XmlRpcResourceManagerClient(resUrl);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
- * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.workflow.engine.runner.EngineRunner#execute(org.apache.oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
- public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
- throws Exception {
+ public void execute(TaskProcessor taskProcessor) throws Exception {
Job workflowTaskJob = new Job();
+ WorkflowTask workflowTask = getTaskFromProcessor(taskProcessor);
workflowTaskJob.setName(workflowTask.getTaskId());
workflowTaskJob
.setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
@@ -83,7 +81,7 @@ public class ResourceRunner extends Engi
}
TaskJobInput in = new TaskJobInput();
- in.setDynMetadata(dynMetadata);
+ in.setDynMetadata(taskProcessor.getWorkflowInstance().getSharedContext());
in.setTaskConfig(workflowTask.getTaskConfig());
in.setWorkflowTaskInstanceClassName(workflowTask.getTaskInstanceClassName());
@@ -110,10 +108,10 @@ public class ResourceRunner extends Engi
/* (non-Javadoc)
- * @see
org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+ * @see
org.apache.oodt.cas.workflow.engine.runner.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
- public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+ public boolean hasOpenSlots(TaskProcessor taskProcessor) throws Exception {
// TODO Auto-generated method stub
return false;
}
@@ -141,4 +139,5 @@ public class ResourceRunner extends Engi
return false;
}
+
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
Thu Sep 20 06:35:44 2012
@@ -22,6 +22,9 @@ import java.net.URL;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
//Google imports
import com.google.common.base.Preconditions;
@@ -30,6 +33,7 @@ import com.google.common.base.Preconditi
* Factory which creates {@link ResourceRunner}s.
*
* @author bfoster (Brian Foster)
+ * @author mattmann (Chris Mattmann)
*/
public class ResourceRunnerFactory implements EngineRunnerFactory{
@@ -37,6 +41,8 @@ public class ResourceRunnerFactory imple
private static final String RESOURCE_MANAGER_URL_PROPERTY =
"org.apache.oodt.cas.workflow.engine.resourcemgr.url";
+ private static final String INSTANCE_REPO_FACTORY_PROPERTY =
"workflow.engine.instanceRep.factory";
+
private String resUrl;
public ResourceRunnerFactory() {
@@ -49,7 +55,7 @@ public class ResourceRunnerFactory imple
Preconditions.checkNotNull(resUrl,
"Must specify Resource Manager URL [property = "
+ RESOURCE_MANAGER_URL_PROPERTY + "]");
- return new ResourceRunner(new URL(resUrl));
+ return new ResourceRunner(new URL(resUrl),
getWorkflowInstanceRepository());
} catch (MalformedURLException e) {
LOG.log(Level.SEVERE, "Failed to load ResourceRunner : " +
e.getMessage(), e);
return null;
@@ -59,4 +65,10 @@ public class ResourceRunnerFactory imple
public void setResourceManagerUrl(String resUrl) {
this.resUrl = resUrl;
}
+
+ protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+ return GenericWorkflowObjectFactory
+ .getWorkflowInstanceRepositoryFromClassName(System
+ .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+ }
}
Modified:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1387871&r1=1387870&r2=1387871&view=diff
==============================================================================
---
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
(original)
+++
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
Thu Sep 20 06:35:44 2012
@@ -29,8 +29,9 @@ import org.apache.commons.io.FileUtils;
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
import
org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunner;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.Priority;
import org.apache.oodt.commons.date.DateUtils;
import org.apache.oodt.commons.util.DateConvert;
@@ -54,17 +55,31 @@ public class TestAsynchronousLocalEngine
private AsynchronousLocalEngineRunner runner;
protected File testDir;
-
+
private QuerierAndRunnerUtils utils;
public void testRun() {
- WorkflowTask task = utils.getTask(testDir);
+ TaskProcessor taskProcessor1 = null;
+ TaskProcessor taskProcessor2 = null;
+
+ try {
+ taskProcessor1 = (TaskProcessor) utils.getProcessor(Priority.getDefault()
+ .getValue(), "Executing", "running");
+ taskProcessor2 = (TaskProcessor) utils.getProcessor(Priority.getDefault()
+ .getValue(), "Executing", "running");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
Metadata met = new Metadata();
- met.addMetadata("StartDateTime",
DateUtils.toString(Calendar.getInstance()));
+ met.addMetadata("StartDateTime",
DateUtils.toString(Calendar.getInstance()));
+
+ taskProcessor1.getWorkflowInstance().getSharedContext().addMetadata(met);
+ taskProcessor2.getWorkflowInstance().getSharedContext().addMetadata(met);
try {
- runner.execute(task, met);
- runner.execute(task, met);
+ runner.execute(taskProcessor1);
+ runner.execute(taskProcessor2);
assertTrue(ranFast());
} catch (Exception e) {
e.printStackTrace();
@@ -80,7 +95,8 @@ public class TestAsynchronousLocalEngine
try {
String line = FileUtils.readFileToString(f);
String[] toks = line.split(",");
- assertEquals("Toks not equal to 2: toks=["+Arrays.asList(toks)+"]", 2,
toks.length);
+ assertEquals("Toks not equal to 2: toks=[" + Arrays.asList(toks) + "]",
+ 2, toks.length);
Date dateTime = DateConvert.isoParse(toks[1]);
Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
new DateTime());
@@ -114,8 +130,9 @@ public class TestAsynchronousLocalEngine
*/
@Override
protected void setUp() throws Exception {
- String parentPath = File.createTempFile("test",
"txt").getParentFile().getAbsolutePath();
- parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+ String parentPath = File.createTempFile("test", "txt").getParentFile()
+ .getAbsolutePath();
+ parentPath = parentPath.endsWith("/") ? parentPath : parentPath + "/";
String testJobDirPath = parentPath + "jobs";
testDir = new File(testJobDirPath);
testDir.mkdirs();