Author: mattmann
Date: Tue Feb 28 06:30:47 2012
New Revision: 1294480
URL: http://svn.apache.org/viewvc?rev=1294480&view=rev
Log:
OODT-381 WIP: Create Runner framework to allow flexible WorkflowTask execution
on different runtimes
- use #TDD to drive the construction of the asynchronous engine runner
- clean up unit test and make tests pass
- TODO: make parameters to async engine runner configurable via
workflow.properties
Added:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
Modified:
oodt/trunk/workflow/pom.xml
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
Modified: oodt/trunk/workflow/pom.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/pom.xml?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
--- oodt/trunk/workflow/pom.xml (original)
+++ oodt/trunk/workflow/pom.xml Tue Feb 28 06:30:47 2012
@@ -106,18 +106,6 @@ the License.
</build>
</profile>
</profiles>
- <repositories>
- <!-- FIXME: Find a better, more permanent mirror than some project's
dependencies that they (inadvertently?) published -->
- <repository>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <id>some-mirror-that-has-javamail-1.3.3</id>
- <name>Some Mirror That Has Javamail 1.3.3</name>
-
<url>http://mirrors.ibiblio.org/pub/mirrors/maven/mule/dependencies/maven2/</url>
- <layout>default</layout>
- </repository>
- </repositories>
<dependencies>
<dependency>
<groupId>org.apache.oodt</groupId>
@@ -216,6 +204,11 @@ the License.
<version>1.1</version>
</dependency>
<dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.2</version>
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
---
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
(original)
+++
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
Tue Feb 28 06:30:47 2012
@@ -21,6 +21,7 @@ package org.apache.oodt.cas.workflow.eng
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -50,8 +51,11 @@ public class AsynchronousLocalEngineRunn
private Map<String, Thread> workerMap;
+ private final int NUM_THREADS = 25;
+
public AsynchronousLocalEngineRunner() {
- this.executor = new ThreadPoolExecutor(0, 0, 0, TimeUnit.SECONDS, null,
+ this.executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 30,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new RejectedExecutionHandler() {
@Override
@@ -105,7 +109,7 @@ public class AsynchronousLocalEngineRunn
};
- String id = null;
+ String id = "";
synchronized (id) {
id = UUID.randomUUID().toString();
this.workerMap.put(id, worker);
Added:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java?rev=1294480&view=auto
==============================================================================
---
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
(added)
+++
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/SimpleTester.java
Tue Feb 28 06:30:47 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import
org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
+
+/**
+ *
+ * A simple workflow task instance that writes its start date time to a file.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class SimpleTester implements WorkflowTaskInstance {
+
+ private static final Logger LOG = Logger.getLogger(SimpleTester.class
+ .getName());
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance#run(org.apache
+ * .oodt.cas.metadata.Metadata,
+ * org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration)
+ */
+ @Override
+ public void run(Metadata metadata, WorkflowTaskConfiguration config)
+ throws WorkflowTaskInstanceException {
+ PrintWriter pw = null;
+ try {
+ String jobFilePath = config.getProperty("TestDirPath") + "task-"
+ + metadata.getMetadata("StartDateTime") + ".job";
+ LOG.log(Level.INFO, "Creating job file: [" + jobFilePath + "]");
+ pw = new PrintWriter(new FileOutputStream(jobFilePath));
+ pw.println("StartDateTime=" + metadata.getMetadata("StartDateTime"));
+ } catch (FileNotFoundException e) {
+ throw new WorkflowTaskInstanceException(e.getMessage());
+ } finally {
+ if (pw != null) {
+ try {
+ pw.close();
+ } catch (Exception ignore) {
+ }
+ pw = null;
+ }
+ }
+
+ }
+}
Modified:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1294480&r1=1294479&r2=1294480&view=diff
==============================================================================
---
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
(original)
+++
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
Tue Feb 28 06:30:47 2012
@@ -21,10 +21,7 @@ package org.apache.oodt.cas.workflow.eng
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.InputStreamReader;
-import java.io.PrintWriter;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
@@ -33,11 +30,13 @@ import java.util.Date;
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
-import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
-import
org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
import org.apache.oodt.commons.date.DateUtils;
import org.apache.oodt.commons.util.DateConvert;
+//JODA imports
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+
//Junit imports
import junit.framework.TestCase;
@@ -59,24 +58,29 @@ public class TestAsynchronousLocalEngine
WorkflowTask task = new WorkflowTask();
task.setConditions(Collections.emptyList());
task.setRequiredMetFields(Collections.emptyList());
- task.setTaskConfig(new WorkflowTaskConfiguration());
task.setTaskId("urn:cas:workflow:tester");
task.setTaskInstanceClassName(SimpleTester.class.getName());
task.setTaskName("Tester");
+ WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+ config.addConfigProperty("TestDirPath",
+ testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
+ : testDir.getAbsolutePath() + "/");
+ task.setTaskConfig(config);
Metadata met = new Metadata();
met.addMetadata("StartDateTime",
DateUtils.toString(Calendar.getInstance()));
- met.addMetadata("TestDir", testDir.getAbsolutePath());
try {
runner.execute(task, met);
runner.execute(task, met);
assertTrue(ranFast());
} catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
}
}
private boolean ranFast() {
boolean ranFast = true;
+ int jobNum = 1;
for (File f : this.testDir.listFiles()) {
BufferedReader br = null;
try {
@@ -88,11 +92,14 @@ public class TestAsynchronousLocalEngine
String[] toks = line.split(",");
Date dateTime = DateConvert.isoParse(toks[1]);
- // FIXME: compare the date time with the current
- // date time and make sure that it's not larger
- // than 30 seconds for any of the files (should be 2)
- // in there
+ Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
+ new DateTime());
+ if (seconds.getSeconds() > 30) {
+ fail("More than 30 seconds elapsed now and running job " + jobNum
+ + ": seconds elapsed: [" + seconds.getSeconds() + "]");
+ }
} catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
ranFast = false;
} finally {
@@ -110,38 +117,6 @@ public class TestAsynchronousLocalEngine
return ranFast;
}
- private class SimpleTester implements WorkflowTaskInstance {
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance#run(org.apache
- * .oodt.cas.metadata.Metadata,
- * org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration)
- */
- @Override
- public void run(Metadata metadata, WorkflowTaskConfiguration config)
- throws WorkflowTaskInstanceException {
- PrintWriter pw = null;
- try {
- pw = new PrintWriter(new FileOutputStream(testDir.getAbsolutePath()
- + "/" + "task-" + metadata.getMetadata("StartDateTime")));
- pw.println("StartDateTime=" + metadata.getMetadata("StartDateTime"));
- } catch (FileNotFoundException e) {
- throw new WorkflowTaskInstanceException(e.getMessage());
- } finally {
- if (pw != null) {
- try {
- pw.close();
- } catch (Exception ignore) {
- }
- pw = null;
- }
- }
-
- }
- }
-
/*
* (non-Javadoc)
*
@@ -149,7 +124,12 @@ public class TestAsynchronousLocalEngine
*/
@Override
protected void setUp() throws Exception {
- testDir = File.createTempFile("test", "txt").getParentFile();
+ String parentPath = File.createTempFile("test",
"txt").getParentFile().getAbsolutePath();
+ parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+ String testJobDirPath = parentPath + "jobs";
+ testDir = new File(testJobDirPath);
+ testDir.mkdirs();
+ this.runner = new AsynchronousLocalEngineRunner();
}
/*
@@ -164,6 +144,7 @@ public class TestAsynchronousLocalEngine
deleteAllFiles(testDir.getAbsolutePath());
testDir.delete();
testDir = null;
+ this.runner = null;
}
private void deleteAllFiles(String startDir) {