Author: mattmann
Date: Tue Feb 28 06:30:47 2012
New Revision: 1294480

URL: http://svn.apache.org/viewvc?rev=1294480&view=rev
Log:
OODT-381 WIP: Create Runner framework to allow flexible WorkflowTask execution 
on different runtimes
  - use #TDD to drive the construction of the asynchronous engine runner
  - clean up unit test and make tests pass
  - TODO: make parameters to async engine runner configurable via 
workflow.properties

Added:
    
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
Modified:
    oodt/trunk/workflow/pom.xml
    
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
    
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java

Modified: oodt/trunk/workflow/pom.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/pom.xml?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
--- oodt/trunk/workflow/pom.xml (original)
+++ oodt/trunk/workflow/pom.xml Tue Feb 28 06:30:47 2012
@@ -106,18 +106,6 @@ the License.
       </build>
     </profile>
   </profiles>
-  <repositories>
-      <!-- FIXME: Find a better, more permanent mirror than some project's 
dependencies that they (inadvertently?) published -->
-      <repository>
-        <snapshots>
-            <enabled>false</enabled>
-        </snapshots>
-        <id>some-mirror-that-has-javamail-1.3.3</id>
-        <name>Some Mirror That Has Javamail 1.3.3</name>
-        
<url>http://mirrors.ibiblio.org/pub/mirrors/maven/mule/dependencies/maven2/</url>
-        <layout>default</layout>
-      </repository>
-  </repositories>
   <dependencies>
     <dependency>
       <groupId>org.apache.oodt</groupId>
@@ -216,6 +204,11 @@ the License.
       <version>1.1</version>
     </dependency>
     <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.1</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>3.8.2</version>

Modified: 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
 Tue Feb 28 06:30:47 2012
@@ -21,6 +21,7 @@ package org.apache.oodt.cas.workflow.eng
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -50,8 +51,11 @@ public class AsynchronousLocalEngineRunn
 
   private Map<String, Thread> workerMap;
 
+  private final int NUM_THREADS = 25;
+
   public AsynchronousLocalEngineRunner() {
-    this.executor = new ThreadPoolExecutor(0, 0, 0, TimeUnit.SECONDS, null,
+    this.executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 30,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
         new RejectedExecutionHandler() {
 
           @Override
@@ -105,7 +109,7 @@ public class AsynchronousLocalEngineRunn
 
     };
 
-    String id = null;
+    String id = "";
     synchronized (id) {
       id = UUID.randomUUID().toString();
       this.workerMap.put(id, worker);

Added: 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java?rev=1294480&view=auto
==============================================================================
--- 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
 (added)
+++ 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
 Tue Feb 28 06:30:47 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import 
org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
+
+/**
+ * 
+ * A simple workflow task instance that writes its start date time to a file.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class SimpleTester implements WorkflowTaskInstance {
+
+  private static final Logger LOG = Logger.getLogger(SimpleTester.class
+      .getName());
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance#run(org.apache
+   * .oodt.cas.metadata.Metadata,
+   * org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration)
+   */
+  @Override
+  public void run(Metadata metadata, WorkflowTaskConfiguration config)
+      throws WorkflowTaskInstanceException {
+    PrintWriter pw = null;
+    try {
+      String jobFilePath = config.getProperty("TestDirPath") + "task-"
+          + metadata.getMetadata("StartDateTime") + ".job";
+      LOG.log(Level.INFO, "Creating job file: [" + jobFilePath + "]");
+      pw = new PrintWriter(new FileOutputStream(jobFilePath));
+      pw.println("StartDateTime=" + metadata.getMetadata("StartDateTime"));
+    } catch (FileNotFoundException e) {
+      throw new WorkflowTaskInstanceException(e.getMessage());
+    } finally {
+      if (pw != null) {
+        try {
+          pw.close();
+        } catch (Exception ignore) {
+        }
+        pw = null;
+      }
+    }
+
+  }
+}

Modified: 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
--- 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
 (original)
+++ 
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
 Tue Feb 28 06:30:47 2012
@@ -21,10 +21,7 @@ package org.apache.oodt.cas.workflow.eng
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -33,11 +30,13 @@ import java.util.Date;
 import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
-import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
-import 
org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
 import org.apache.oodt.commons.date.DateUtils;
 import org.apache.oodt.commons.util.DateConvert;
 
+//JODA imports
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+
 //Junit imports
 import junit.framework.TestCase;
 
@@ -59,24 +58,29 @@ public class TestAsynchronousLocalEngine
     WorkflowTask task = new WorkflowTask();
     task.setConditions(Collections.emptyList());
     task.setRequiredMetFields(Collections.emptyList());
-    task.setTaskConfig(new WorkflowTaskConfiguration());
     task.setTaskId("urn:cas:workflow:tester");
     task.setTaskInstanceClassName(SimpleTester.class.getName());
     task.setTaskName("Tester");
+    WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+    config.addConfigProperty("TestDirPath",
+        testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
+            : testDir.getAbsolutePath() + "/");
+    task.setTaskConfig(config);
     Metadata met = new Metadata();
     met.addMetadata("StartDateTime", 
DateUtils.toString(Calendar.getInstance()));
-    met.addMetadata("TestDir", testDir.getAbsolutePath());
     try {
       runner.execute(task, met);
       runner.execute(task, met);
       assertTrue(ranFast());
     } catch (Exception e) {
+      e.printStackTrace();
       fail(e.getMessage());
     }
   }
 
   private boolean ranFast() {
     boolean ranFast = true;
+    int jobNum = 1;
     for (File f : this.testDir.listFiles()) {
       BufferedReader br = null;
       try {
@@ -88,11 +92,14 @@ public class TestAsynchronousLocalEngine
 
         String[] toks = line.split(",");
         Date dateTime = DateConvert.isoParse(toks[1]);
-        // FIXME: compare the date time with the current
-        // date time and make sure that it's not larger
-        // than 30 seconds for any of the files (should be 2)
-        // in there
+        Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
+            new DateTime());
+        if (seconds.getSeconds() > 30) {
+          fail("More than 30 seconds elapsed now and running job " + jobNum
+              + ": seconds elapsed: [" + seconds.getSeconds() + "]");
+        }
       } catch (Exception e) {
+        e.printStackTrace();
         fail(e.getMessage());
         ranFast = false;
       } finally {
@@ -110,38 +117,6 @@ public class TestAsynchronousLocalEngine
     return ranFast;
   }
 
-  private class SimpleTester implements WorkflowTaskInstance {
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance#run(org.apache
-     * .oodt.cas.metadata.Metadata,
-     * org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration)
-     */
-    @Override
-    public void run(Metadata metadata, WorkflowTaskConfiguration config)
-        throws WorkflowTaskInstanceException {
-      PrintWriter pw = null;
-      try {
-        pw = new PrintWriter(new FileOutputStream(testDir.getAbsolutePath()
-            + "/" + "task-" + metadata.getMetadata("StartDateTime")));
-        pw.println("StartDateTime=" + metadata.getMetadata("StartDateTime"));
-      } catch (FileNotFoundException e) {
-        throw new WorkflowTaskInstanceException(e.getMessage());
-      } finally {
-        if (pw != null) {
-          try {
-            pw.close();
-          } catch (Exception ignore) {
-          }
-          pw = null;
-        }
-      }
-
-    }
-  }
-
   /*
    * (non-Javadoc)
    * 
@@ -149,7 +124,12 @@ public class TestAsynchronousLocalEngine
    */
   @Override
   protected void setUp() throws Exception {
-    testDir = File.createTempFile("test", "txt").getParentFile();
+    String parentPath = File.createTempFile("test", 
"txt").getParentFile().getAbsolutePath();
+    parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+    String testJobDirPath = parentPath + "jobs";
+    testDir = new File(testJobDirPath);
+    testDir.mkdirs();
+    this.runner = new AsynchronousLocalEngineRunner();
   }
 
   /*
@@ -164,6 +144,7 @@ public class TestAsynchronousLocalEngine
     deleteAllFiles(testDir.getAbsolutePath());
     testDir.delete();
     testDir = null;
+    this.runner = null;
   }
 
   private void deleteAllFiles(String startDir) {


Reply via email to