Author: mattmann
Date: Mon May 28 05:19:08 2012
New Revision: 1343141

URL: http://svn.apache.org/viewvc?rev=1343141&view=rev
Log:
- progress towards OODT-310: WIP: Port WEngine to trunk

Added:
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
    
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
Modified:
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ParallelProcessor.java
 Mon May 28 05:19:08 2012
@@ -33,6 +33,10 @@ import org.apache.oodt.cas.workflow.util
  * 
  */
 public class ParallelProcessor extends WorkflowProcessor {
+  
+  public ParallelProcessor(){
+    this(null);
+  }
 
   public ParallelProcessor(WorkflowLifecycleManager lifecycleMgr) {
     super(lifecycleMgr);

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
 Mon May 28 05:19:08 2012
@@ -33,6 +33,10 @@ import org.apache.oodt.cas.workflow.life
  */
 public class SequentialProcessor extends WorkflowProcessor {
   
+  public SequentialProcessor(){
+    this(null);
+  }
+  
   public SequentialProcessor(WorkflowLifecycleManager lifecycleManager){
     super(lifecycleManager);
   }

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
 Mon May 28 05:19:08 2012
@@ -40,6 +40,10 @@ public class TaskProcessor extends Workf
 
   private Class<? extends WorkflowTaskInstance> instanceClass;
   private String jobId;
+  
+  public TaskProcessor(){
+    this(null);
+  }
 
   public TaskProcessor(WorkflowLifecycleManager lifecycleManager) {
     super(lifecycleManager);

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
 Mon May 28 05:19:08 2012
@@ -79,26 +79,25 @@ public class TaskQuerier implements Runn
   public void run() {
     while (running) {
       List<WorkflowProcessor> processors = processorQueue.getProcessors();
-      synchronized (runnableProcessors) {
-        runnableProcessors.clear();
-      }
+      List<WorkflowProcessor> processorsToRun = new 
Vector<WorkflowProcessor>();
+
       for (WorkflowProcessor processor : processors) {
         // OK now get its lifecycle
         WorkflowLifecycle lifecycle = getLifecycleForProcessor(processor);
         if (!(processor.getState().getCategory().getName().equals("done") || 
processor
             .getState().getCategory().getName().equals("holding"))) {
-          for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
-            synchronized (runnableProcessors) {
+            for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) 
{
               tp.setState(lifecycle.createState("Executing", "running",
                   "Added to Runnable queue"));
-              runnableProcessors.add(processor);
+              System.out.println("Added processor with priority: 
["+tp.getPriority()+"]");
+              processorsToRun.add(processor);
+            }
+            
+            prioritizer.sort(processorsToRun);
+            
+            synchronized(runnableProcessors){
+              runnableProcessors = processorsToRun;
             }
-          }
-
-          // now prioritize the runnable processors
-          synchronized (runnableProcessors) {
-            prioritizer.sort(runnableProcessors);
-          }
 
         } else {
           continue;
@@ -125,7 +124,7 @@ public class TaskQuerier implements Runn
   /**
    * @return the runnableProcessors
    */
-  public synchronized List<WorkflowProcessor> getRunnableProcessors() {
+  public List<WorkflowProcessor> getRunnableProcessors() {
     return runnableProcessors;
   }
 

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1343141&r1=1343140&r2=1343141&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
 Mon May 28 05:19:08 2012
@@ -73,6 +73,7 @@ public abstract class WorkflowProcessor 
   protected WorkflowLifecycleManager lifecycleManager;
 
   public WorkflowProcessor(WorkflowLifecycleManager lifecycleManager) {
+    this.subProcessors = new Vector<WorkflowProcessor>();
     this.listeners = new Vector<WorkflowProcessorListener>();
     this.ProcessorDateTimeInfo = new ProcessorDateTimeInfo();
     this.staticMetadata = new Metadata();

Added: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java?rev=1343141&view=auto
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
 (added)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
 Mon May 28 05:19:08 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imorts
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.Priority;
+
+//Google imports
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ * Builds {@link WorkflowProcessor}s.
+ * 
+ * @author bfoster
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class WorkflowProcessorBuilder {
+
+  private String id;
+  private double priority;
+  private List<WorkflowProcessor> subProcessors;
+  private WorkflowLifecycleManager lifecycleManager;
+
+  private WorkflowProcessorBuilder() {
+    subProcessors = Lists.newArrayList();
+  }
+
+  public static WorkflowProcessorBuilder aWorkflowProcessor() {
+    return new WorkflowProcessorBuilder();
+  }
+
+  public WorkflowProcessorBuilder withId(String id) {
+    this.id = id;
+    return this;
+  }
+
+  public WorkflowProcessorBuilder withLifecycleManager(
+      WorkflowLifecycleManager lifecycleManager) {
+    this.lifecycleManager = lifecycleManager;
+    return this;
+  }
+
+  public WorkflowProcessorBuilder withPriority(double priority) {
+    this.priority = priority;
+    return this;
+  }
+
+  public WorkflowProcessorBuilder with(WorkflowProcessorBuilder wpb,
+      Class<? extends WorkflowProcessor> clazz) throws InstantiationException,
+      IllegalAccessException {
+    subProcessors.add(wpb.build(clazz));
+    return this;
+  }
+
+  public WorkflowProcessor build(Class<? extends WorkflowProcessor> clazz)
+      throws InstantiationException, IllegalAccessException {
+    WorkflowProcessor wp = clazz.newInstance();
+    wp.getWorkflowInstance().setId(id);
+    wp.setLifecycleManager(lifecycleManager);
+    wp.setPriority(Priority.getPriority(priority));
+    wp.setSubProcessors(subProcessors);
+    return wp;
+  }
+}
\ No newline at end of file

Added: 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1343141&view=auto
==============================================================================
--- 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
 (added)
+++ 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
 Mon May 28 05:19:08 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.Priority;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ * 
+ * Test harness for the {@link TestTaskQuerier}.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class TestTaskQuerier extends TestCase {
+  
+  private int dateGen;
+
+  public TestTaskQuerier() {
+    this.dateGen = 0;
+  }
+
+  public void testGetRunnableProcessors() {
+    FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+    MockProcessorQueue processorQueue = new MockProcessorQueue();
+    assertNotNull(processorQueue.getProcessors());
+    assertEquals(3, processorQueue.getProcessors().size());
+    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+    Thread querierThread = new Thread(querier);
+    querierThread.start();
+    while (querier.getRunnableProcessors().size() != 2) {
+      assertNotNull(querier.getRunnableProcessors());
+    }
+
+    querier.setRunning(false);
+    assertNotNull(querier.getRunnableProcessors());
+    assertEquals(2, querier.getRunnableProcessors().size());
+    assertNotNull(querier.getRunnableProcessors().get(0));
+    assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
+    assertEquals(2.0, querier.getRunnableProcessors().get(0).getPriority()
+        .getValue());
+    assertEquals(7.0, querier.getRunnableProcessors().get(1).getPriority()
+        .getValue());
+    try{
+      querierThread.join();
+    }
+    catch(InterruptedException ignore){}
+
+  }
+
+  private WorkflowProcessor getProcessor(double priority, String stateName,
+      String categoryName) throws InstantiationException, 
IllegalAccessException {
+    WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
+        "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
+    WorkflowInstance inst = new WorkflowInstance();
+    Date sd = new Date();
+    sd.setTime(sd.getTime() + (this.dateGen * 5000));
+    this.dateGen++;
+    inst.setStartDate(sd);
+    inst.setId("winst-" + priority);
+    Workflow workflow = new Workflow();
+    workflow.setTasks(Collections.EMPTY_LIST);
+    inst.setWorkflow(workflow);
+    inst.setPriority(Priority.getPriority(priority));
+    WorkflowProcessorBuilder builder = 
WorkflowProcessorBuilder.aWorkflowProcessor()
+    .withLifecycleManager(lifecycleManager)
+    .withPriority(priority);
+    SequentialProcessor processor = 
(SequentialProcessor)builder.build(SequentialProcessor.class);
+    processor.setWorkflowInstance(inst);
+    processor.setState(lifecycleManager.getDefaultLifecycle().createState(
+        stateName, categoryName, ""));
+    assertNotNull(processor.getState());
+    assertNotNull(processor.getState().getCategory());
+    assertNotNull(processor.getState().getCategory().getName());
+    List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
+    TaskProcessor taskProcessor = 
(TaskProcessor)builder.build(TaskProcessor.class);
+    taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
+        "Queued", "waiting", ""));
+    runnables.add(taskProcessor);
+    processor.setSubProcessors(runnables);    
+    return processor;
+  }
+
+  class MockProcessorQueue extends WorkflowProcessorQueue {
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
+     * ()
+     */
+    @Override
+    public synchronized List<WorkflowProcessor> getProcessors() {
+      List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
+      try {
+        processors.add(getProcessor(10.0, "Success", "done"));
+        processors.add(getProcessor(2.0, "Loaded", "initial"));
+        processors.add(getProcessor(7.0, "Loaded", "initial"));
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      return processors;
+    }
+
+  }
+
+}


Reply via email to