Author: mattmann
Date: Mon May 28 05:19:08 2012
New Revision: 1343141
URL: http://svn.apache.org/viewvc?rev=1343141&view=rev
Log:
- progress towards OODT-310: WIP: Port WEngine to trunk
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
Mon May 28 05:19:08 2012
@@ -33,6 +33,10 @@ import org.apache.oodt.cas.workflow.util
*
*/
public class ParallelProcessor extends WorkflowProcessor {
+
+ public ParallelProcessor(){
+ this(null);
+ }
public ParallelProcessor(WorkflowLifecycleManager lifecycleMgr) {
super(lifecycleMgr);
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
Mon May 28 05:19:08 2012
@@ -33,6 +33,10 @@ import org.apache.oodt.cas.workflow.life
*/
public class SequentialProcessor extends WorkflowProcessor {
+ public SequentialProcessor(){
+ this(null);
+ }
+
public SequentialProcessor(WorkflowLifecycleManager lifecycleManager){
super(lifecycleManager);
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
Mon May 28 05:19:08 2012
@@ -40,6 +40,10 @@ public class TaskProcessor extends Workf
private Class<? extends WorkflowTaskInstance> instanceClass;
private String jobId;
+
+ public TaskProcessor(){
+ this(null);
+ }
public TaskProcessor(WorkflowLifecycleManager lifecycleManager) {
super(lifecycleManager);
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
Mon May 28 05:19:08 2012
@@ -79,26 +79,25 @@ public class TaskQuerier implements Runn
public void run() {
while (running) {
List<WorkflowProcessor> processors = processorQueue.getProcessors();
- synchronized (runnableProcessors) {
- runnableProcessors.clear();
- }
+ List<WorkflowProcessor> processorsToRun = new
Vector<WorkflowProcessor>();
+
for (WorkflowProcessor processor : processors) {
// OK now get its lifecycle
WorkflowLifecycle lifecycle = getLifecycleForProcessor(processor);
if (!(processor.getState().getCategory().getName().equals("done") ||
processor
.getState().getCategory().getName().equals("holding"))) {
- for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
- synchronized (runnableProcessors) {
+ for (TaskProcessor tp : processor.getRunnableWorkflowProcessors())
{
tp.setState(lifecycle.createState("Executing", "running",
"Added to Runnable queue"));
- runnableProcessors.add(processor);
+ System.out.println("Added processor with priority:
["+tp.getPriority()+"]");
+ processorsToRun.add(processor);
+ }
+
+ prioritizer.sort(processorsToRun);
+
+ synchronized(runnableProcessors){
+ runnableProcessors = processorsToRun;
}
- }
-
- // now prioritize the runnable processors
- synchronized (runnableProcessors) {
- prioritizer.sort(runnableProcessors);
- }
} else {
continue;
@@ -125,7 +124,7 @@ public class TaskQuerier implements Runn
/**
* @return the runnableProcessors
*/
- public synchronized List<WorkflowProcessor> getRunnableProcessors() {
+ public List<WorkflowProcessor> getRunnableProcessors() {
return runnableProcessors;
}
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
Mon May 28 05:19:08 2012
@@ -73,6 +73,7 @@ public abstract class WorkflowProcessor
protected WorkflowLifecycleManager lifecycleManager;
public WorkflowProcessor(WorkflowLifecycleManager lifecycleManager) {
+ this.subProcessors = new Vector<WorkflowProcessor>();
this.listeners = new Vector<WorkflowProcessorListener>();
this.ProcessorDateTimeInfo = new ProcessorDateTimeInfo();
this.staticMetadata = new Metadata();
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java?rev=1343141&view=auto
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
(added)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
Mon May 28 05:19:08 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imorts
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.Priority;
+
+//Google imports
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * Builds {@link WorkflowProcessor}s.
+ *
+ * @author bfoster
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class WorkflowProcessorBuilder {
+
+ private String id;
+ private double priority;
+ private List<WorkflowProcessor> subProcessors;
+ private WorkflowLifecycleManager lifecycleManager;
+
+ private WorkflowProcessorBuilder() {
+ subProcessors = Lists.newArrayList();
+ }
+
+ public static WorkflowProcessorBuilder aWorkflowProcessor() {
+ return new WorkflowProcessorBuilder();
+ }
+
+ public WorkflowProcessorBuilder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public WorkflowProcessorBuilder withLifecycleManager(
+ WorkflowLifecycleManager lifecycleManager) {
+ this.lifecycleManager = lifecycleManager;
+ return this;
+ }
+
+ public WorkflowProcessorBuilder withPriority(double priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public WorkflowProcessorBuilder with(WorkflowProcessorBuilder wpb,
+ Class<? extends WorkflowProcessor> clazz) throws InstantiationException,
+ IllegalAccessException {
+ subProcessors.add(wpb.build(clazz));
+ return this;
+ }
+
+ public WorkflowProcessor build(Class<? extends WorkflowProcessor> clazz)
+ throws InstantiationException, IllegalAccessException {
+ WorkflowProcessor wp = clazz.newInstance();
+ wp.getWorkflowInstance().setId(id);
+ wp.setLifecycleManager(lifecycleManager);
+ wp.setPriority(Priority.getPriority(priority));
+ wp.setSubProcessors(subProcessors);
+ return wp;
+ }
+}
\ No newline at end of file
Added:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1343141&view=auto
==============================================================================
---
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
(added)
+++
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
Mon May 28 05:19:08 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.Priority;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ *
+ * Test harness for the {@link TestTaskQuerier}.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class TestTaskQuerier extends TestCase {
+
+ private int dateGen;
+
+ public TestTaskQuerier() {
+ this.dateGen = 0;
+ }
+
+ public void testGetRunnableProcessors() {
+ FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+ MockProcessorQueue processorQueue = new MockProcessorQueue();
+ assertNotNull(processorQueue.getProcessors());
+ assertEquals(3, processorQueue.getProcessors().size());
+ TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+ Thread querierThread = new Thread(querier);
+ querierThread.start();
+ while (querier.getRunnableProcessors().size() != 2) {
+ assertNotNull(querier.getRunnableProcessors());
+ }
+
+ querier.setRunning(false);
+ assertNotNull(querier.getRunnableProcessors());
+ assertEquals(2, querier.getRunnableProcessors().size());
+ assertNotNull(querier.getRunnableProcessors().get(0));
+ assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
+ assertEquals(2.0, querier.getRunnableProcessors().get(0).getPriority()
+ .getValue());
+ assertEquals(7.0, querier.getRunnableProcessors().get(1).getPriority()
+ .getValue());
+ try{
+ querierThread.join();
+ }
+ catch(InterruptedException ignore){}
+
+ }
+
+ private WorkflowProcessor getProcessor(double priority, String stateName,
+ String categoryName) throws InstantiationException,
IllegalAccessException {
+ WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
+ "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
+ WorkflowInstance inst = new WorkflowInstance();
+ Date sd = new Date();
+ sd.setTime(sd.getTime() + (this.dateGen * 5000));
+ this.dateGen++;
+ inst.setStartDate(sd);
+ inst.setId("winst-" + priority);
+ Workflow workflow = new Workflow();
+ workflow.setTasks(Collections.EMPTY_LIST);
+ inst.setWorkflow(workflow);
+ inst.setPriority(Priority.getPriority(priority));
+ WorkflowProcessorBuilder builder =
WorkflowProcessorBuilder.aWorkflowProcessor()
+ .withLifecycleManager(lifecycleManager)
+ .withPriority(priority);
+ SequentialProcessor processor =
(SequentialProcessor)builder.build(SequentialProcessor.class);
+ processor.setWorkflowInstance(inst);
+ processor.setState(lifecycleManager.getDefaultLifecycle().createState(
+ stateName, categoryName, ""));
+ assertNotNull(processor.getState());
+ assertNotNull(processor.getState().getCategory());
+ assertNotNull(processor.getState().getCategory().getName());
+ List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
+ TaskProcessor taskProcessor =
(TaskProcessor)builder.build(TaskProcessor.class);
+ taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
+ "Queued", "waiting", ""));
+ runnables.add(taskProcessor);
+ processor.setSubProcessors(runnables);
+ return processor;
+ }
+
+ class MockProcessorQueue extends WorkflowProcessorQueue {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
+ * ()
+ */
+ @Override
+ public synchronized List<WorkflowProcessor> getProcessors() {
+ List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
+ try {
+ processors.add(getProcessor(10.0, "Success", "done"));
+ processors.add(getProcessor(2.0, "Loaded", "initial"));
+ processors.add(getProcessor(7.0, "Loaded", "initial"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ return processors;
+ }
+
+ }
+
+}