Author: bfoster
Date: Fri Jan 28 00:18:17 2011
New Revision: 1064378

URL: http://svn.apache.org/viewvc?rev=1064378&view=rev
Log:

- beginnings of pluggin in wengine into hadoop

------------------------

Added:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
   (with props)
Modified:
    oodt/branches/wengine-branch/wengine/pom.xml

Modified: oodt/branches/wengine-branch/wengine/pom.xml
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/pom.xml?rev=1064378&r1=1064377&r2=1064378&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/pom.xml (original)
+++ oodt/branches/wengine-branch/wengine/pom.xml Fri Jan 28 00:18:17 2011
@@ -170,6 +170,11 @@
                        <version>2.6.2</version>
                </dependency>
                <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-mapred</artifactId>
+               <version>0.21.1-SNAPSHOT</version>
+               </dependency>
+               <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>3.8.2</version>

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public interface HadoopMapReduceable {
+
+       public String getMapperClass();
+       
+       public String getCombinerClass();
+       
+       public String getReducerClass();
+       
+       public int getNumOfReducers();
+       
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,111 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopRunner extends EngineRunner {
+
+       public static final String DEPENDENCY_FILE_KEYS = 
"HadoopRunner/DependencyFileKeys";
+       public static final String INPUT_FILE_KEYS = 
"HadoopRunner/InputFileKeys";
+       
+       @Override
+       public void execute(TaskInstance workflowInstance) throws Exception {
+               Metadata instanceMetadata = workflowInstance.getMetadata();
+           Configuration conf = new Configuration();
+           Cluster cluster = new Cluster(conf);
+           Job job = Job.getInstance(cluster);
+           job.setJobName(workflowInstance.getModelId());
+           job.setJarByClass(HadoopRunner.class);
+
+           //setup input files
+           List<String> inputFiles = this.getInputFiles(instanceMetadata);
+           for (String inputFile : inputFiles)
+               FileInputFormat.addInputPath(job, new Path(inputFile));
+       conf.set(HadoopTaskProperties.INPUT_FILES, StringUtils.join(inputFiles, 
","));
+       
+       //setup deps files
+           List<String> dependencyFiles = 
this.getDependencyFiles(instanceMetadata);
+           for (String dependencyFile : dependencyFiles) 
+               job.addCacheFile(new Path(dependencyFile).toUri());
+       conf.set(HadoopTaskProperties.DEPENDENCY_FILES, 
StringUtils.join(dependencyFiles, ","));
+
+               if (workflowInstance instanceof HadoopMapReduceable) {
+                       job.setMapperClass((Class<? extends Mapper>) 
Class.forName(((HadoopMapReduceable) workflowInstance).getMapperClass()));
+                       job.setCombinerClass((Class<? extends Reducer>) 
Class.forName(((HadoopMapReduceable) workflowInstance).getCombinerClass()));
+                       job.setReducerClass((Class<? extends Reducer>) 
Class.forName(((HadoopMapReduceable) workflowInstance).getReducerClass()));
+                       job.setNumReduceTasks(((HadoopMapReduceable) 
workflowInstance).getNumOfReducers());
+                       job.setOutputFormatClass(HadoopTaskOutputFormat.class);
+               }else {
+                       job.setMapperClass(HadoopTaskInstance.class);
+                       job.setNumReduceTasks(0);
+                       XStream xstream = new XStream();
+                       String xstreamTask = "/temp/job-input/xstream/" + 
workflowInstance.getInstanceId() + "/" + workflowInstance.getModelId() + 
".xstream";
+                       String xmlTask = xstream.toXML(workflowInstance);
+                       //write xmlTask to xstreamTask;
+               FileInputFormat.addInputPath(job, new Path(xstreamTask));
+               }
+               
+               job.submit();
+       }
+       
+       private List<String> getInputFiles(Metadata metadata) throws Exception {
+               Vector<String> inputFiles = new Vector<String>();
+               for (String inputFileKey : 
metadata.getMetadata(INPUT_FILE_KEYS).split(","))
+                       
inputFiles.addAll(metadata.getAllMetadata(inputFileKey));
+               return inputFiles;
+       }
+       
+       private List<String> getDependencyFiles(Metadata metadata) throws 
Exception {
+               Vector<String> dependencyFiles = new Vector<String>();
+               for (String dependencyFileKey : 
metadata.getMetadata(DEPENDENCY_FILE_KEYS).split(","))
+                       
dependencyFiles.addAll(metadata.getAllMetadata(dependencyFileKey));
+               return dependencyFiles;
+       }
+       
+       @Override
+       public int getOpenSlots(TaskInstance workflowInstance) throws Exception 
{
+               return Integer.MAX_VALUE;
+       }
+
+       @Override
+       public boolean hasOpenSlots(TaskInstance workflowInstance) throws 
Exception {
+               return true;
+       }
+
+//     public static class MapperTask extends Mapper<Object, Text, Text, File> 
{
+//             private Text outputFilesText = new Text("OutputFiles");
+//
+//             public void map(Object key, Text value, Context context)
+//                             throws IOException, InterruptedException {
+//                     Path[] localFiles = context.getLocalCacheFiles();
+//                     File outputFile = new File("/path/to/output/file");
+//                     int returnValue = ExecUtils.callProgram("/usr/bin/time 
-v -o log/runtime_" + value.toString() + ".txt  /path/to/exe 
config/${config_type}/l2_fp.config output/l2_" + value.toString() + ".h5) > 
log/l2_" + value.toString() + ".log.running 2>&1", (File) null);
+//                     ExecUtils.callProgram("mv -f log/l2_" + 
value.toString() + ".log.running log/l2_" + value.toString() + ".log", (File) 
null);
+//                     context.write(outputFilesText, outputFile);
+//             }
+//     }
+//
+//     public static class ReducerTask extends Reducer<Text, File, Text, File> 
{
+//             private Text outputFileText = new Text("OutputFile");
+//
+//             public void reduce(Text key, Iterable<File> values,
+//                             Context context) throws IOException, 
InterruptedException {
+//                     File outputFile = new 
File("/path/to/output/concat/file");
+//                     context.write(outputFileText, outputFile);
+//             }
+//     }
+       
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,23 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopTaskInstance extends Mapper<Text, Text, Text, File> {
+
+       public static final String XSTREAM_TASK = "hadoop.cas.xstream.task";
+       
+       public void map(Text key, Text value, Context context)
+               throws IOException, InterruptedException {
+               XStream xstream = new XStream();
+               TaskInstance taskInstance = (TaskInstance) 
xstream.fromXML(context.getConfiguration().get(XSTREAM_TASK));
+               taskInstance.execute();
+       }
+       
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,117 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClient;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClientFactory;
+import 
org.apache.oodt.cas.workflow.server.channel.xmlrpc.XmlRpcCommunicationChannelClientFactory;
+import org.apache.oodt.cas.workflow.state.done.FailureState;
+import org.apache.oodt.cas.workflow.state.done.StoppedState;
+import org.apache.oodt.cas.workflow.state.done.SuccessState;
+import org.apache.oodt.cas.workflow.state.running.ExecutingState;
+
+public class HadoopTaskOutputCommitter extends OutputCommitter {
+
+       @Override
+       public void abortTask(TaskAttemptContext taskContext) throws 
IOException {
+               new File(taskContext.getWorkingDirectory().toString()).delete();
+       }
+
+       @Override
+       public void commitTask(TaskAttemptContext taskContext) throws 
IOException {
+               new File(taskContext.getWorkingDirectory().toString()).delete();
+       }
+
+       @Override
+       public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                       throws IOException {
+               return taskContext.getWorkingDirectory() != null;
+       }
+
+       @Override
+       public void abortJob(JobContext jobContext, JobStatus.State state) 
throws IOException {
+               super.abortJob(jobContext, state);
+               this.cleanupJobFiles(jobContext);
+               try {
+                       WorkflowEngineClient wmClient = 
this.getWorkflowClient(jobContext);
+                       if (state.equals(JobStatus.State.FAILED))
+                               
wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID),
 jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new 
FailureState(""));
+                       else if (state.equals(JobStatus.State.KILLED))
+                               
wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID),
 jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new 
StoppedState(""));
+               }catch (Exception e) {
+                       throw new IOException();
+               }
+       }
+       
+       @Override
+       public void commitJob(JobContext jobContext) throws IOException {
+               super.commitJob(jobContext);
+               this.cleanupJobFiles(jobContext);
+               try {
+                       WorkflowEngineClient wmClient = 
this.getWorkflowClient(jobContext);
+                       
wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID),
 jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new 
SuccessState(""));
+               }catch (Exception e) {
+                       throw new IOException();
+               }
+       }
+       
+       private void cleanupJobFiles(JobContext jobContext) throws IOException {
+               try {
+                       Cluster cluster = new 
Cluster(jobContext.getConfiguration());
+                       for (String inputFile : 
jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(","))
+                               cluster.getFileSystem().delete(new 
Path(inputFile), false);
+                   for (String dependencyFile : 
jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(","))
 
+                           cluster.getFileSystem().delete(new 
Path(dependencyFile), false);
+               }catch (Exception e) {
+                       throw new IOException("Failed to cleanup job files : " 
+ e.getMessage(), e);
+               }
+       }
+       
+       @Override
+       public void setupJob(JobContext jobContext) throws IOException {
+               try {
+                   Cluster cluster = new 
Cluster(jobContext.getConfiguration());
+                   for (String inputFile : 
jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(",")) 
{
+                       Path inputFilePath = new Path(inputFile);
+                           
cluster.getFileSystem().copyFromLocalFile(inputFilePath, inputFilePath);
+                   }
+                   for (String dependencyFile : 
jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(","))
 {
+                       Path dependencyFilePath = new Path(dependencyFile);
+                           
cluster.getFileSystem().copyFromLocalFile(dependencyFilePath, 
dependencyFilePath);
+                   }
+                   try {
+                       WorkflowEngineClient wmClient = 
this.getWorkflowClient(jobContext);
+                       
wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID),
 jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new 
ExecutingState(""));
+                   }catch (Exception e) {}
+               }catch (Exception e) {
+                       throw new IOException("Failed to setup job '" + 
jobContext.getJobName() + "' : " + e.getMessage(), e);
+               }
+       }
+
+       @Override
+       public void setupTask(TaskAttemptContext taskContext) throws 
IOException {
+               new File(taskContext.getWorkingDirectory().toString()).mkdirs();
+       }
+       
+       private WorkflowEngineClient getWorkflowClient(JobContext jobContext) {
+           XmlRpcCommunicationChannelClientFactory xmlrpcFactory = new 
XmlRpcCommunicationChannelClientFactory();
+           xmlrpcFactory.setChunkSize(1024);
+           xmlrpcFactory.setConnectionRetries(10);
+           xmlrpcFactory.setConnectionRetryIntervalSecs(30);
+           xmlrpcFactory.setConnectionTimeout(60);
+           xmlrpcFactory.setRequestTimeout(20);
+           
xmlrpcFactory.setServerUrl(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_URL));
+           WorkflowEngineClientFactory wengineFactory = new 
WorkflowEngineClientFactory();
+           wengineFactory.setAutoPagerSize(1000);
+           wengineFactory.setCommunicationChannelClientFactory(xmlrpcFactory);
+           return wengineFactory.createEngine();
+       }
+
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,32 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class HadoopTaskOutputFormat extends OutputFormat<Text, Text> {
+
+       @Override
+       public void checkOutputSpecs(JobContext context) throws IOException,
+                       InterruptedException {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+                       throws IOException, InterruptedException {
+               return new HadoopTaskOutputCommitter();
+       }
+
+       @Override
+       public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext 
context)
+                       throws IOException, InterruptedException {
+               return null;
+       }
+
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java?rev=1064378&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
 Fri Jan 28 00:18:17 2011
@@ -0,0 +1,14 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public class HadoopTaskProperties {
+
+       public static final String WORKFLOW_URL = "hadoop.cas.task.wm.url";
+       public static final String WORKFLOW_INSTANCE_ID = 
"hadoop.cas.task.wm.instance.id";
+       public static final String WORKFLOW_MODEL_ID = 
"hadoop.cas.task.wm.model.id";
+
+       public static final String DEPENDENCY_FILES = 
"hadoop.cas.task.dependency.files";
+       public static final String INPUT_FILES = "hadoop.cas.task.input.files";
+       
+       private HadoopTaskProperties() {}
+       
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to