http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/pom.xml b/taverna-execution-hadoop/pom.xml deleted file mode 100644 index 039f3f2..0000000 --- a/taverna-execution-hadoop/pom.xml +++ /dev/null @@ -1,33 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.taverna.engine</groupId> - <artifactId>taverna-engine</artifactId> - <version>3.1.0-incubating-SNAPSHOT</version> - </parent> - <artifactId>taverna-execution-hadoop</artifactId> - <name>Apache Taverna Hadoop Workflow Execution Service</name> - <description>An (experimental) Execution Service for executing Taverna workflows using Hadoop</description> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-reference-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <!-- <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-workflowmodel-api</artifactId> - <version>${pjoject.parent.version}</version> - </dependency> --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>0.20.204.0</version> - </dependency> - <!-- <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapred</artifactId> - <version>0.23.0-SNAPSHOT</version> - </dependency> --> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/org/pingel/util/CrossProduct.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/org/pingel/util/CrossProduct.java b/taverna-execution-hadoop/src/main/java/org/pingel/util/CrossProduct.java deleted file mode 100644 index 1e3b4c1..0000000 --- a/taverna-execution-hadoop/src/main/java/org/pingel/util/CrossProduct.java +++ /dev/null @@ -1,159 +0,0 @@ -/*** - * Copyright (c) 2008 Adam Pingel - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. -* 2. Redistributions in binary form must reproduce the above copyright -* notice, this list of conditions and the following disclaimer in the -* documentation and/or other materials provided with the distribution. -* 3. The name of the author may not be used to endorse or promote products -* derived from this software without specific prior written permission. -* -* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR -* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. -* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, -* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT -* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF -* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -* -*/ - -package org.pingel.util; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -public class CrossProduct<E> implements Iterable<List<E>> -{ - private List<Iterable<E>> iterables = new ArrayList<Iterable<E>>(); - - public CrossProduct(List<? extends Iterable<E>> iterables) - { - this.iterables.addAll(iterables); - } - - public CrossProduct(Iterable<E>... iterables) - { - this.iterables = new ArrayList<Iterable<E>>(); - for( Iterable<E> it : iterables ) { - this.iterables.add(it); - } - } - - public List<? extends Iterable<E>> getCollections() - { - return iterables; - } - - public void addIterable(Iterable<E> iterable) - { - this.iterables.add(iterable); - } - - public Iterator<List<E>> iterator() - { - return new CrossProductIterator<E>(this); - } - - public static void main(String[] argv) - { - List<String> v1 = new ArrayList<String>(); - List<String> v2 = new ArrayList<String>(); - List<String> v3 = new ArrayList<String>(); - - v1.add("a"); - v1.add("b"); - v2.add("0"); - v2.add("1"); - v3.add("X"); - - CrossProduct<String> cp = new CrossProduct<String>(v1, v2, v3, v2); - - Iterator<List<String>> it = cp.iterator(); - while( it.hasNext() ) { - List<String> tuple = it.next(); - System.out.println(tuple); - } - - } - - - class CrossProductIterator<InE> implements Iterator<List<InE>> - { - private CrossProduct<InE> cp; - private List<Iterator<InE>> iterators; - private List<InE> tuple; - - public CrossProductIterator(CrossProduct<InE> cp) - { - this.cp = cp; - - iterators = new ArrayList<Iterator<InE>>(); - tuple = new ArrayList<InE>(); - - for(int i=0; i < cp.getCollections().size(); i++ ) { - iterators.add(cp.getCollections().get(i).iterator()); - tuple.add(iterators.get(i).next()); - } - } - - public void remove() - { - // I don't think there are any reasonable semantics - // for "remove" since the "underlying collection" - // is never actually instantiated - throw new UnsupportedOperationException(); - } - - public boolean hasNext() - { - return tuple != null; - } - - boolean incrementFirstAvailable(int i) - { - if( i == iterators.size() ) { - return true; - } - else if( iterators.get(i).hasNext() ) { - tuple.set(i, iterators.get(i).next()); - return false; - } - else { - iterators.set(i, cp.iterables.get(i).iterator()); - tuple.set(i, iterators.get(i).next()); - return incrementFirstAvailable(i+1); - } - } - - public List<InE> next() - { - if( tuple == null ) { - throw new NoSuchElementException(); - } - - List<InE> result = new ArrayList<InE>(); - for(int i=0; i < tuple.size(); i++) { - result.add(tuple.get(i)); - } - - if( incrementFirstAvailable(0) ) { - tuple = null; - } - - return result; - } - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java deleted file mode 100644 index 122b473..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java +++ /dev/null @@ -1,108 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * An input format that receives an input directory containing a number of directories with input files - * for each input port to a Taverna processor/activity that will be executed as part of this - * MapReduce job. Mapping between directory name -> Taverna processor/activity input port name - * is carried in the job's Context. - * - * @author Alex Nenadic - * - */ -public class CrossProductInputFormat extends - FileInputFormat<Text, TextArrayWritable> { - - private static final Log Logger = LogFactory.getLog(CrossProductInputFormat.class); - - // Do not split files into blocks - @Override - protected boolean isSplitable(JobContext context, Path filename) { - return false; - } - - @Override - public RecordReader<Text, TextArrayWritable> createRecordReader( - InputSplit split, TaskAttemptContext context) { - return new CrossProductRecordReader(); - } - - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException { - - // Generate splits. Split is a list of directories where each directory - // contains inputs for one input port of the Taverna processor/activity we - // are invoking. - // We will have only one split for cross product that will know about all - // the files in all input directories and will generate RecordReaders - // for every combination of files inside these directories. -// CrossProductInputSplit split = new CrossProductInputSplit(); - - // List the input port directories contained in the input directory passed - // in from the command line. - List<FileStatus> inputPortDirectories = listStatus(job); - - final FileSystem fs = job.getWorkingDirectory().getFileSystem(job.getConfiguration()); - Path workingDirectory = job.getWorkingDirectory(); - System.out.println("Working directory: " + workingDirectory); - System.out.println("Adding directories to the cross product split:"); - ArrayList<Path> inputPortDirectoriesPaths = new ArrayList<Path>(); - for (FileStatus inputPortDirectory : inputPortDirectories){ - // TODO input port directories need to be ordered in the order of the - // input ports of the Taverna processor/activity they are going into - - //inputPortDirectoriesPaths.add(new Text(inputPortDirectory.getPath().toString())); - inputPortDirectoriesPaths.add(inputPortDirectory.getPath()); - System.out.println(inputPortDirectory.getPath()); - - } - CrossProductInputSplit split = new CrossProductInputSplit(workingDirectory, inputPortDirectoriesPaths); - - - List<InputSplit> splits = new ArrayList<InputSplit>(); - splits.add(split); - - return splits; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java deleted file mode 100644 index 2ff9113..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java +++ /dev/null @@ -1,88 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -/** - * - * - * @author Alex Nenadic - */ -public class CrossProductInputSplit extends FileSplit { - // - // private long length = 0; - // private String[] hosts; - private List<Path> inputPortDirectories; - private Path workingDirectory; - - public CrossProductInputSplit() { - super(null,0,0,null); - inputPortDirectories = new ArrayList<Path>(); - System.out.println("Calling default constructor for cross product split"); - } - - public CrossProductInputSplit(Path workingDirectory, List<Path> inputPortDirectories) { - // this.length = length; - // this.hosts = hosts; - super(workingDirectory, 0, 0, new String[0]); - this.workingDirectory = workingDirectory; - this.inputPortDirectories = inputPortDirectories; - System.out.println("Calling non-default constructor for cross product split"); - } - - public void addInputPortDirectory(Path path) { - inputPortDirectories.add(path); - } - - public List<Path> getInputPortDirectories() { - return inputPortDirectories; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, workingDirectory.toString()); - out.writeInt(inputPortDirectories.size()); - for (Path path : inputPortDirectories) { - Text.writeString(out, path.toString()); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - workingDirectory = new Path(Text.readString(in)); - int length = in.readInt(); - for (int i = 0; i < length; i++) { - inputPortDirectories.add(new Path(Text.readString(in))); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java deleted file mode 100644 index 6602f55..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java +++ /dev/null @@ -1,112 +0,0 @@ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.pingel.util.CrossProduct; - -public class CrossProductRecordReader extends RecordReader<Text, TextArrayWritable>{ - - private static final Log Logger = LogFactory.getLog(CrossProductRecordReader.class); - - // Input directories (one for each port) containing files that are used - // as inputs to Taverna processor/activity - private List<Path> inputPortDirectories; - - private CrossProduct<String> crossProduct ; - - private Iterator<List<String>> crossProductIterator; - - private List<String> currentIndexes; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - - System.out.println("Inside record reader's initialize"); - - CrossProductInputSplit crossProductSplit = (CrossProductInputSplit)split; - inputPortDirectories = crossProductSplit.getInputPortDirectories(); - System.out.println("Record reader received " + +inputPortDirectories.size() + " input port directories"); - - List<List<String>> iterables = new ArrayList<List<String>>(); - for (int i=0; i<inputPortDirectories.size();i++ ){ - - Path inputPortDirectory = inputPortDirectories.get(i); - //Path inputPortDirectory = inputPortDirectories.get(i); - FileStatus[] files = inputPortDirectory.getFileSystem(context.getConfiguration()).listStatus(inputPortDirectory); - List<String> fileNames = new ArrayList<String>(); - for (FileStatus file : files){ - fileNames.add(file.getPath().getName()); - } - iterables.add(fileNames); - } - - crossProduct = new CrossProduct<String>(iterables); - crossProductIterator = crossProduct.iterator(); - - } - - @Override - public boolean nextKeyValue(){ - - boolean hasNextKey = crossProductIterator.hasNext(); - System.out.println("Has record reader next key value? " + hasNextKey); - if (hasNextKey){ - currentIndexes = crossProductIterator.next(); - } - return hasNextKey; - } - - @Override - public Text getCurrentKey() throws IOException, InterruptedException { - - StringBuffer sb = new StringBuffer(); - for (String index : currentIndexes){ - sb.append(index + "."); - } - // Remove last "." - String indexesString = sb.toString(); - System.out.println("Get current key: " + indexesString); - if (indexesString.contains(".")){ - indexesString = indexesString.substring(0, indexesString.length() - 1); - } - return new Text(indexesString); - } - - @Override - public TextArrayWritable getCurrentValue() { - - TextArrayWritable arrayWritable = new TextArrayWritable(); - Text[] array = new Text[currentIndexes.size()]; - for(int i= 0; i< currentIndexes.size(); i++){ - Path file = new Path(inputPortDirectories.get(i).toString(), currentIndexes.get(i)); - array[i] = new Text(file.toString()); - } - arrayWritable.set(array); - return arrayWritable; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java deleted file mode 100644 index e5dc063..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class CrossProductTest extends Configured implements Tool { - - public static class Map extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> { - public void map(Text key, TextArrayWritable value, Context context) throws IOException, - InterruptedException { - System.out.println("Map key = " + key); - System.out.println("Map value = " ); - - for (int i = 0; i < value.get().length; i++){ - System.out.println(" " + value.get()[i]); - } - - context.write(key, value); - } - } - - public static class Reduce extends Reducer<Text, TextArrayWritable, Text, Text> { - public void reduce(Text key, Iterable<TextArrayWritable> values, Context context) - throws IOException, InterruptedException { - - System.out.println("Reduce key = " + key); - context.write(key, f(values)); - } - - private Text f(Iterable<TextArrayWritable> values) { - StringBuilder sb = new StringBuilder(); - - // There should be only one array - TextArrayWritable arrayValue = values.iterator().next(); - - for (int i = 0; i < arrayValue.get().length; i++){ - sb.append(arrayValue.get()[i] + "\nx"); - } - String str = sb.toString(); - if (str.contains("\nx")){ - str = str.substring(0, sb.lastIndexOf("\nx") -1); - } - System.out.println("Result of function f(): " + str); - - return new Text(str); - } - } - - public int run(String[] args) throws Exception { - - Configuration configuration = getConf(); - configuration.set("taverna.datalinks", "A|X,B|Y"); - System.out.println(configuration); - Job job = new Job(configuration); - job.setJarByClass(CrossProductTest.class); - job.setJobName("crossproduct"); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(TextArrayWritable.class); - - job.setMapperClass(Map.class); -// job.setCombinerClass(Reduce.class); - job.setReducerClass(Reduce.class); - - job.setInputFormatClass(CrossProductInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - - FileInputFormat.setInputPaths(job, new Path(args[0])); - System.out.println("Input dir: " + args[0]); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - System.out.println("Output dir: " + args[1]); - - boolean success = job.waitForCompletion(true); - return success ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new CrossProductTest(), args); - System.exit(ret); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java deleted file mode 100644 index 2fa38e4..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class DotProductTest extends Configured implements Tool { - - public static class Map extends Mapper<LongWritable, MapWritable, LongWritable, MapWritable> { - public void map(LongWritable key, MapWritable value, Context context) throws IOException, - InterruptedException { - System.out.println("Map key = " + key); - System.out.println("Map value tag = " + value.get(new Text("tag"))); - System.out.println("Map value record = " + value.get(new Text("record"))); - context.write(key, value); - } - } - - public static class Reduce extends Reducer<LongWritable, MapWritable, LongWritable, Text> { - public void reduce(LongWritable key, Iterable<MapWritable> values, Context context) - throws IOException, InterruptedException { - - System.out.println("Reduce key = " + key); - context.write(key, f(values)); - context.write(key, f(values)); - } - - private Text f(Iterable<MapWritable> values) { - StringBuilder sb = new StringBuilder(); - for (MapWritable value : values) { - System.out.println("Reduce tag = " + value.get(new Text("tag"))); - System.out.println("Reduce value = " + value.get(new Text("record"))); - sb.append(value.get(new Text("record")) + " "); - } - return new Text(sb.toString()); - } - } - - public int run(String[] args) throws Exception { - java.util.Map datalinks = new HashMap(); - - - Configuration configuration = getConf(); - configuration.set("taverna.datalinks", "A|X,B|Y"); - System.out.println(configuration); - Job job = new Job(configuration); - job.setJarByClass(DotProductTest.class); - job.setJobName("dotproduct"); - - job.setOutputKeyClass(LongWritable.class); - job.setOutputValueClass(MapWritable.class); - - job.setMapperClass(Map.class); -// job.setCombinerClass(Reduce.class); - job.setReducerClass(Reduce.class); - - job.setInputFormatClass(TavernaInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - - FileInputFormat.setInputPaths(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - - boolean success = job.waitForCompletion(true); - return success ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new DotProductTest(), args); - System.exit(ret); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java deleted file mode 100644 index 57b41c5..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * - * - * @author David Withers - */ -public class TavernaInputFormat extends FileInputFormat<LongWritable, MapWritable> { - - @Override - public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new TavernaRecordReader(); - } - - @Override - protected boolean isSplitable(JobContext context, Path filename) { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java deleted file mode 100644 index d1bf0b3..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java +++ /dev/null @@ -1,69 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; - -/** - * - * - * @author David Withers - */ -public class TavernaInputSplit extends InputSplit { - private int[] index; - private Map<String, Path> inputs; - private long length; - private String[] hosts; - - public TavernaInputSplit(int[] index, Map<String, Path> inputs, long length, String[] hosts) { - this.index = index; - this.inputs = inputs; - this.length = length; - this.hosts = hosts; - } - - public int[] getIndex() { - return index; - } - - public Map<String, Path> getInputs() { - return inputs; - } - - @Override - public long getLength() throws IOException, InterruptedException { - return length; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - if (hosts == null) { - return new String[] {}; - } else { - return this.hosts; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java deleted file mode 100644 index fa5d1dc..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java +++ /dev/null @@ -1,75 +0,0 @@ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Reducer.Context; - -public class TavernaMapper extends org.apache.hadoop.mapreduce.Mapper<int[], Map<String, Path>, Object, Object> { - - private org.apache.hadoop.mapreduce.Mapper.Context context; - - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - this.context = context; - } - - @Override - protected void map(int[] key, Map<String, Path> value, - Context context) throws IOException, InterruptedException { - - // Value contains a map of input ports to data values on those ports - // (i.e. file paths to data on the input ports) - - - // Get the activity and invoke it with the passed inputs per port -// -// String activityClassName = context.getConfiguration().get("taverna.activity.class"); -// String activityConfigurationXML = context.getConfiguration().get("taverna.activity.configuration"); -// -// ClassLoader classLoader = this.getClass().getClassLoader(); -// Class<?> activityClass = null; -// AbstractAsynchronousActivity<?> activity = null; -// try { -// activityClass = classLoader.loadClass(activityClassName); -// activity = (AbstractAsynchronousActivity<?>) activityClass.newInstance(); -// } catch (ClassNotFoundException e) { -// // TODO Auto-generated catch block -// e.printStackTrace(); -// } catch (InstantiationException e) { -// // TODO Auto-generated catch block -// e.printStackTrace(); -// } catch (IllegalAccessException e) { -// // TODO Auto-generated catch block -// e.printStackTrace(); -// } -// -// activity.configure(activityConfigurationXML); -// activity.executeAsynch(data, callback); - - System.out.println("Index: " + key); - - // Input port names - Iterator<String> iterator = value.keySet().iterator(); - while(iterator.hasNext()){ - String inputPortName = iterator.next(); - // Simply read values from input files and concatenate them - Path inputFilePath = value.get(inputPortName); - FSDataInputStream fileInputStream = inputFilePath.getFileSystem(null).open(inputFilePath); - //fileInputStream. - System.out.println("Input port: " + inputPortName + ". Input value: "+ inputFilePath +"."); - } - - // Map of output ports to data values on those ports - // (i.e. file paths to data on the output ports) - Map<String, Path> outputValue = new HashMap<String, Path>(); - context.write(key, outputValue); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java deleted file mode 100644 index 190d91a..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java +++ /dev/null @@ -1,106 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -/** - * - * - * @author David Withers - */ -public class TavernaRecordReader extends RecordReader<LongWritable, MapWritable> { - - private FileSplit fileSplit; - private String recordName; - private FileStatus[] files; - private int index = -1; - private Map<String, String> datalinks; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - fileSplit = (FileSplit) split; - Path path = fileSplit.getPath(); - recordName = path.getName(); - files = path.getFileSystem(context.getConfiguration()).listStatus(path); - setDatalinks(context); - } - - /** - * @param context - */ - private void setDatalinks(TaskAttemptContext context) { - datalinks = new HashMap<String, String>(); - String datalinkConfig = context.getConfiguration().get("taverna.datalinks"); - if (datalinkConfig != null) { - String[] datalinksSplit = datalinkConfig.split(","); - for (String datalink : datalinksSplit) { - String[] split = datalink.split("\\|"); - if (split.length == 2) { - datalinks.put(split[0], split[1]); - } - } - } - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - index++; - return index < files.length; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(Long.valueOf(files[index].getPath().getName())); - } - - @Override - public MapWritable getCurrentValue() throws IOException, InterruptedException { - MapWritable mapWritable = new MapWritable(); - mapWritable.put(new Text("tag"), new Text(datalinks.get(recordName))); - mapWritable.put(new Text("record"), new Text(files[index].getPath().toString())); - return mapWritable; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return files.length == 0 ? 1 : (index + 1) / files.length; - } - - @Override - public void close() throws IOException { - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java deleted file mode 100644 index 11f85dd..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java +++ /dev/null @@ -1,24 +0,0 @@ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.fs.Path; - -public class TavernaReducer extends - org.apache.hadoop.mapreduce.Reducer<int[], Map<String, Path>, Object, Object> { - - private Context context; - - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - this.context = context; - } - - @Override - protected void reduce(int[] key, Iterable<Map<String, Path>> values, - Context context) throws IOException, InterruptedException { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java deleted file mode 100644 index 65a75d8..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java +++ /dev/null @@ -1,69 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * - * - * @author David Withers - */ -public class Test extends Configured implements Tool { - - @Override - public int run(String[] args) throws Exception { - Configuration configuration = getConf(); - Job job = new Job(configuration); - job.setJarByClass(Test.class); - job.setJobName("wordcount"); - - job.setOutputKeyClass(int[].class); - job.setOutputValueClass(Map.class); - - job.setMapperClass(TavernaMapper.class); -// job.setCombinerClass(Reduce.class); - job.setReducerClass(TavernaReducer.class); - - job.setInputFormatClass(TavernaInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - - TavernaInputFormat.setInputPaths(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - - boolean success = job.waitForCompletion(true); - return success ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new Test(), args); - System.exit(ret); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java ---------------------------------------------------------------------- diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java deleted file mode 100644 index 1b64c77..0000000 --- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java +++ /dev/null @@ -1,31 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.hadoop; - -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Text; - -public class TextArrayWritable extends ArrayWritable { - public TextArrayWritable() { - super(Text.class); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-impl/pom.xml ---------------------------------------------------------------------- diff --git a/taverna-execution-impl/pom.xml b/taverna-execution-impl/pom.xml deleted file mode 100644 index ec7a6b0..0000000 --- a/taverna-execution-impl/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.taverna.engine</groupId> - <artifactId>taverna-engine</artifactId> - <version>3.1.0-incubating-SNAPSHOT</version> - </parent> - <artifactId>taverna-execution-impl</artifactId> - <packaging>bundle</packaging> - <name>Apache Taverna Platform Execution Service Implementation</name> - <description>A Service for executing Taverna workflows</description> - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Import-Package>uk.org.taverna.platform.execution.api;provide:=true,*</Import-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-execution-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.taverna.language</groupId> - <artifactId>taverna-scufl2-api</artifactId> - <version>${taverna.language.version}</version> - </dependency> - <dependency> - <groupId>org.apache.taverna.language</groupId> - <artifactId>taverna-databundle</artifactId> - <version>${taverna.language.version}</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-impl/src/main/java/uk/org/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java ---------------------------------------------------------------------- diff --git a/taverna-execution-impl/src/main/java/uk/org/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java b/taverna-execution-impl/src/main/java/uk/org/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java deleted file mode 100644 index 13eeec2..0000000 --- a/taverna-execution-impl/src/main/java/uk/org/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java +++ /dev/null @@ -1,354 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl; - -import java.net.URI; -import java.text.MessageFormat; -import java.util.HashSet; -import java.util.Set; -import java.util.logging.Logger; - -import uk.org.taverna.platform.capability.api.ActivityConfigurationException; -import uk.org.taverna.platform.capability.api.ActivityNotFoundException; -import uk.org.taverna.platform.capability.api.DispatchLayerConfigurationException; -import uk.org.taverna.platform.capability.api.DispatchLayerNotFoundException; -import uk.org.taverna.platform.execution.api.ExecutionEnvironment; -import uk.org.taverna.platform.execution.api.ExecutionEnvironmentService; -import uk.org.taverna.platform.execution.api.ExecutionService; -import uk.org.taverna.scufl2.api.activity.Activity; -import uk.org.taverna.scufl2.api.common.NamedSet; -import uk.org.taverna.scufl2.api.common.Scufl2Tools; -import uk.org.taverna.scufl2.api.configurations.Configuration; -import uk.org.taverna.scufl2.api.core.Processor; -import uk.org.taverna.scufl2.api.profiles.ProcessorBinding; -import uk.org.taverna.scufl2.api.profiles.Profile; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * Implementation of the ExecutionEnvironmentService. - * - * @author David Withers - */ -public class ExecutionEnvironmentServiceImpl implements ExecutionEnvironmentService { - private static final Logger logger = Logger.getLogger(ExecutionEnvironmentServiceImpl.class.getName()); - - @SuppressWarnings("unused") - private final Scufl2Tools scufl2Tools = new Scufl2Tools(); - private Set<ExecutionService> executionServices; - - @Override - public Set<ExecutionEnvironment> getExecutionEnvironments() { - Set<ExecutionEnvironment> executionEnvironments = new HashSet<>(); - for (ExecutionService executionService : executionServices) - executionEnvironments.addAll(executionService - .getExecutionEnvironments()); - return executionEnvironments; - } - - @Override - public Set<ExecutionEnvironment> getExecutionEnvironments(Profile profile) { - Set<ExecutionEnvironment> validExecutionEnvironments = new HashSet<>(); - for (ExecutionEnvironment executionEnvironment : getExecutionEnvironments()) - if (isValidExecutionEnvironment(executionEnvironment, profile)) - validExecutionEnvironments.add(executionEnvironment); - return validExecutionEnvironments; - } - - /** - * Sets the ExecutionServices that will be used to find ExecutionEnvironments. - * - * @param executionServices - * the ExecutionServices that will be used to find ExecutionEnvironments - */ - public void setExecutionServices(Set<ExecutionService> executionServices) { - this.executionServices = executionServices; - } - - /** - * @param executionEnvironment - * @param profile - * @return - */ - private boolean isValidExecutionEnvironment(ExecutionEnvironment executionEnvironment, - Profile profile) { - NamedSet<ProcessorBinding> processorBindings = profile.getProcessorBindings(); - for (ProcessorBinding processorBinding : processorBindings) { - Activity activity = processorBinding.getBoundActivity(); - if (!executionEnvironment.activityExists(activity.getType())) { - logger.fine(MessageFormat.format("{0} does not contain activity {1}", - executionEnvironment.getName(), activity.getType())); - return false; - } - Configuration activityConfiguration = activity.getConfiguration(); - if (!isValidActivityConfiguration(executionEnvironment, activityConfiguration, activity)) { - logger.fine(MessageFormat.format("Invalid activity configuration for {1} in {0}", - executionEnvironment.getName(), activity.getType())); - return false; - } - @SuppressWarnings("unused") - Processor processor = processorBinding.getBoundProcessor(); - // TODO check that environment has required dispatch layers for processor configuration -// for (DispatchStackLayer dispatchStackLayer : processor.getDispatchStack()) { -// if (!executionEnvironment.dispatchLayerExists(dispatchStackLayer -// .getType())) { -// logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}", -// executionEnvironment.getName(), -// dispatchStackLayer.getType())); -// return false; -// } -// -// List<Configuration> dispatchLayerConfigurations = scufl2Tools.configurationsFor(dispatchStackLayer, profile); -// if (dispatchLayerConfigurations.size() > 1) { -// logger.fine(MessageFormat.format("{0} contains multiple configurations for dispatch layer {1}", -// executionEnvironment.getName(), -// dispatchStackLayer.getType())); -// } else if (dispatchLayerConfigurations.size() == 1) { -// if (!isValidDispatchLayerConfiguration(executionEnvironment, dispatchLayerConfigurations.get(0), dispatchStackLayer)) { -// logger.fine(MessageFormat.format("Invalid dispatch layer configuration for {1} in {0}", -// executionEnvironment.getName(), dispatchStackLayer.getType())); -// return false; -// } -// } -// } - } - return true; - } - - private boolean isValidActivityConfiguration(ExecutionEnvironment executionEnvironment, - Configuration configuration, Activity activity) { - try { - configuration.getJson(); - configuration.getJsonSchema(); - @SuppressWarnings("unused") - JsonNode environmentSchema = executionEnvironment.getActivityConfigurationSchema(activity.getType()); - // TODO validate against schema - } catch (ActivityNotFoundException e) { - logger.fine(MessageFormat.format("{0} does not contain activity {1}", - executionEnvironment.getName(), activity.getType())); - return false; - } catch (ActivityConfigurationException e) { - logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}", - executionEnvironment.getName(), activity.getType())); - return false; - } - return true; - } - - @SuppressWarnings("unused") - private boolean isValidDispatchLayerConfiguration(ExecutionEnvironment executionEnvironment, - Configuration configuration, URI dispatchLayerType) { - try { - JsonNode environmentSchema = executionEnvironment.getDispatchLayerConfigurationSchema(dispatchLayerType); - // TODO validate against schema - } catch (DispatchLayerNotFoundException e) { - logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}", - executionEnvironment.getName(), dispatchLayerType)); - return false; - } catch (DispatchLayerConfigurationException e) { - logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}", - executionEnvironment.getName(), dispatchLayerType)); - return false; - } - return true; - } - -// /** -// * @param propertyResourceDefinition -// * @param propertyResource -// * @return -// */ -// private boolean isValidPropertyResource(Configuration configuration, -// PropertyResourceDefinition propertyResourceDefinition, PropertyResource propertyResource) { -// if (!propertyResourceDefinition.getTypeURI().equals(propertyResource.getTypeURI())) { -// logger.fine(MessageFormat.format( -// "Property type {0} does not equal property definition type {1}", -// propertyResource.getTypeURI(), propertyResourceDefinition.getTypeURI())); -// return false; -// } -// List<PropertyDefinition> propertyDefinitions = propertyResourceDefinition -// .getPropertyDefinitions(); -// Map<URI, SortedSet<PropertyObject>> properties = propertyResource.getProperties(); -// for (PropertyDefinition propertyDefinition : propertyDefinitions) { -// SortedSet<PropertyObject> propertySet = properties.get(propertyDefinition -// .getPredicate()); -// if (propertySet == null) { -// if (propertyDefinition.isRequired()) { -// logger.fine(MessageFormat.format("Required property {0} is missing", -// propertyDefinition.getPredicate())); -// return false; -// } -// } else { -// if (propertySet.size() == 0 && propertyDefinition.isRequired()) { -// logger.fine(MessageFormat.format("Required property {0} is missing", -// propertyDefinition.getPredicate())); -// return false; -// } -// if (propertySet.size() > 1 && !propertyDefinition.isMultiple()) { -// logger.fine(MessageFormat.format( -// "{0} properties found for singleton property {1}", propertySet.size(), -// propertyDefinition.getPredicate())); -// return false; -// } -// if (propertySet.size() > 1 && propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) { -// logger.fine(MessageFormat.format( -// "{0} property lists found for property {1}", propertySet.size(), -// propertyDefinition.getPredicate())); -// return false; -// } -// for (PropertyObject property : propertySet) { -// if (propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) { -// if (property instanceof PropertyList) { -// PropertyList propertyList = (PropertyList) property; -// for (PropertyObject propertyObject : propertyList) { -// if (!isValidProperty(configuration, propertyDefinition, propertyObject)) { -// logger.fine(MessageFormat.format("Property {0} is invalid", -// propertyDefinition.getPredicate())); -// return false; -// } -// } -// } -// -// } else if (!isValidProperty(configuration, propertyDefinition, property)) { -// logger.fine(MessageFormat.format("Property {0} is invalid", -// propertyDefinition.getPredicate())); -// return false; -// } -// } -// } -// } -// return true; -// } -// -// /** -// * @param propertyDefinition -// * @param property -// * @return -// */ -// private boolean isValidProperty(Configuration configuration, -// PropertyDefinition propertyDefinition, PropertyObject property) { -// if (propertyDefinition instanceof PropertyLiteralDefinition) { -// if (property instanceof PropertyLiteral) { -// PropertyLiteralDefinition propertyLiteralDefinition = (PropertyLiteralDefinition) propertyDefinition; -// PropertyLiteral propertyLiteral = (PropertyLiteral) property; -// if (!propertyLiteral.getLiteralType().equals( -// propertyLiteralDefinition.getLiteralType())) { -// logger.fine(MessageFormat.format( -// "Property type {0} does not equal property definition type {1}", -// propertyLiteral.getLiteralType(), -// propertyLiteralDefinition.getLiteralType())); -// return false; -// } -// LinkedHashSet<String> options = propertyLiteralDefinition.getOptions(); -// if (options != null && options.size() > 0) { -// if (!options.contains(propertyLiteral.getLiteralValue())) { -// logger.fine(MessageFormat.format("Property value {0} is not permitted", -// propertyLiteral.getLiteralValue())); -// return false; -// } -// } -// } else { -// logger.fine(MessageFormat.format("Expected a PropertyLiteral but got a {0}", -// property.getClass().getSimpleName())); -// return false; -// } -// } else if (propertyDefinition instanceof PropertyReferenceDefinition) { -// if (property instanceof PropertyReference) { -// PropertyReferenceDefinition propertyReferenceDefinition = (PropertyReferenceDefinition) propertyDefinition; -// PropertyReference propertyReference = (PropertyReference) property; -// LinkedHashSet<URI> options = propertyReferenceDefinition.getOptions(); -// if (options != null && options.size() > 0) { -// if (!options.contains(propertyReference.getResourceURI())) { -// logger.fine(MessageFormat.format("Property value {0} is not permitted", -// propertyReference.getResourceURI())); -// return false; -// } -// } -// } else { -// logger.fine(MessageFormat.format("Expected a PropertyReference but got a {0}", -// property.getClass().getSimpleName())); -// return false; -// } -// } else if (propertyDefinition instanceof PropertyResourceDefinition) { -// if (property instanceof PropertyResource) { -// PropertyResourceDefinition propertyResourceDefinition = (PropertyResourceDefinition) propertyDefinition; -// PropertyResource propertyResource = (PropertyResource) property; -// return isValidPropertyResource(configuration, propertyResourceDefinition, -// propertyResource); -// } else if (property instanceof PropertyReference) { -// // special cases where a PropertyResource is actually a reference to a WorkflowBundle component -// PropertyReference propertyReference = (PropertyReference) property; -// WorkflowBundle workflowBundle = scufl2Tools.findParent(WorkflowBundle.class, -// configuration); -// URI configUri = uriTools.uriForBean(configuration); -// URI referenceUri = configUri.resolve(propertyReference.getResourceURI()); -// if (workflowBundle != null) { -// URI predicate = propertyDefinition.getPredicate(); -// WorkflowBean workflowBean = uriTools.resolveUri(referenceUri, workflowBundle); -// if (workflowBean == null) { -// logger.fine(MessageFormat.format( -// "Cannot resolve {0} in WorkflowBundle {1}", -// propertyReference.getResourceURI(), workflowBundle.getName())); -// } -// if (predicate.equals(SCUFL2.resolve("#definesInputPort"))) { -// if (workflowBean == null) { -// return false; -// } -// if (!(workflowBean instanceof InputActivityPort)) { -// logger.fine(MessageFormat.format( -// "{0} resolved to a {1}, expected a InputActivityPort", -// propertyReference.getResourceURI(), workflowBean.getClass() -// .getSimpleName())); -// return false; -// } -// } else if (predicate.equals(SCUFL2.resolve("#definesOutputPort"))) { -// if (workflowBean == null) { -// return false; -// } -// if (!(workflowBean instanceof OutputActivityPort)) { -// logger.fine(MessageFormat.format( -// "{0} resolved to a {1}, expected a OutputActivityPort", -// propertyReference.getResourceURI(), workflowBean.getClass() -// .getSimpleName())); -// return false; -// } -// } else { -// logger.fine(MessageFormat.format("Unexpected reference to {0}", predicate)); -// } -// } else { -// logger.fine(MessageFormat -// .format("Cannot resolve reference to {0} because Configuration {1} not contained within a WorkflowBundle", -// referenceUri, configuration.getName())); -// } -// } else { -// logger.fine(MessageFormat.format("Expected a PropertyResource or PropertyReference but got a {0}", -// property.getClass().getSimpleName())); -// return false; -// } -// } else { -// logger.fine(MessageFormat.format("Unknown propery definition class {0}", -// propertyDefinition.getClass().getSimpleName())); -// return false; -// } -// return true; -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context-osgi.xml ---------------------------------------------------------------------- diff --git a/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context-osgi.xml b/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context-osgi.xml deleted file mode 100644 index 87eef58..0000000 --- a/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context-osgi.xml +++ /dev/null @@ -1,13 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:beans="http://www.springframework.org/schema/beans" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/osgi - http://www.springframework.org/schema/osgi/spring-osgi.xsd"> - - <service ref="executionEnvironmentService" interface="uk.org.taverna.platform.execution.api.ExecutionEnvironmentService" /> - - <set id="executionServices" interface="uk.org.taverna.platform.execution.api.ExecutionService" /> - -</beans:beans> http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context.xml ---------------------------------------------------------------------- diff --git a/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context.xml b/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context.xml deleted file mode 100644 index 61f5107..0000000 --- a/taverna-execution-impl/src/main/resources/META-INF/spring/execution-context.xml +++ /dev/null @@ -1,10 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans.xsd"> - - <bean id="executionEnvironmentService" class="uk.org.taverna.platform.execution.impl.ExecutionEnvironmentServiceImpl"> - <property name="executionServices" ref="executionServices" /> - </bean> - -</beans> http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-local/pom.xml ---------------------------------------------------------------------- diff --git a/taverna-execution-local/pom.xml b/taverna-execution-local/pom.xml deleted file mode 100644 index 83eea60..0000000 --- a/taverna-execution-local/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.taverna.engine</groupId> - <artifactId>taverna-engine</artifactId> - <version>3.1.0-incubating-SNAPSHOT</version> - </parent> - <artifactId>taverna-execution-local</artifactId> - <packaging>bundle</packaging> - <name>Apache Taverna Platform Local Execution Service</name> - <description>An Execution Service for executing Taverna workflows using a local Taverna Dataflow Engine</description> - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Import-Package>uk.org.taverna.platform.execution.api;provide:=true,*</Import-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-capability-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-execution-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-report-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-workflowmodel-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-reference-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-reference-types</artifactId> - <version>${project.parent.version}</version> - </dependency> - <!-- - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-provenanceconnector</artifactId> - <version>${project.parent.version}</version> - </dependency> - --> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-observer</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>taverna-database-configuration-api</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.taverna.language</groupId> - <artifactId>taverna-scufl2-api</artifactId> - <version>${taverna.language.version}</version> - </dependency> - <dependency> - <groupId>org.apache.taverna.language</groupId> - <artifactId>taverna-databundle</artifactId> - <version>${taverna.language.version}</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java ---------------------------------------------------------------------- diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java deleted file mode 100644 index e176d4d..0000000 --- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java +++ /dev/null @@ -1,243 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2010 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.local; - -import static java.util.logging.Level.SEVERE; -import static uk.org.taverna.platform.execution.impl.local.T2ReferenceConverter.convertPathToObject; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.logging.Logger; - -import net.sf.taverna.t2.facade.ResultListener; -import net.sf.taverna.t2.facade.WorkflowInstanceFacade; -import net.sf.taverna.t2.invocation.InvocationContext; -import net.sf.taverna.t2.invocation.TokenOrderException; -import net.sf.taverna.t2.invocation.WorkflowDataToken; -import net.sf.taverna.t2.monitor.MonitorManager; -import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter; -import net.sf.taverna.t2.reference.ReferenceService; -import net.sf.taverna.t2.reference.T2Reference; -import net.sf.taverna.t2.workflowmodel.Dataflow; -import net.sf.taverna.t2.workflowmodel.DataflowInputPort; -import net.sf.taverna.t2.workflowmodel.Edits; -import net.sf.taverna.t2.workflowmodel.InvalidDataflowException; - -import org.purl.wf4ever.robundle.Bundle; - -import uk.org.taverna.databundle.DataBundles; -import uk.org.taverna.platform.capability.api.ActivityService; -import uk.org.taverna.platform.capability.api.DispatchLayerService; -import uk.org.taverna.platform.execution.api.AbstractExecution; -import uk.org.taverna.platform.execution.api.InvalidWorkflowException; -import uk.org.taverna.platform.report.ActivityReport; -import uk.org.taverna.platform.report.ProcessorReport; -import uk.org.taverna.platform.report.WorkflowReport; -import uk.org.taverna.scufl2.api.container.WorkflowBundle; -import uk.org.taverna.scufl2.api.core.Workflow; -import uk.org.taverna.scufl2.api.profiles.Profile; - -/** - * An {@link uk.org.taverna.platform.execution.api.Execution Execution} for - * executing Taverna workflows on a local Taverna Dataflow Engine. - * - * @author David Withers - */ -public class LocalExecution extends AbstractExecution implements ResultListener { - - private static Logger logger = Logger.getLogger(LocalExecution.class - .getName()); - - private final WorkflowToDataflowMapper mapping; - - private final WorkflowInstanceFacade facade; - - private final LocalExecutionMonitor executionMonitor; - - private final ReferenceService referenceService; - - private final Map<String, DataflowInputPort> inputPorts = new HashMap<String, DataflowInputPort>(); - - /** - * Constructs an Execution for executing Taverna workflows on a local - * Taverna Dataflow Engine. - * - * @param workflowBundle - * the <code>WorkflowBundle</code> containing the - * <code>Workflow</code>s required for execution - * @param workflow - * the <code>Workflow</code> to execute - * @param profile - * the <code>Profile</code> to use when executing the - * <code>Workflow</code> - * @param dataBundle - * the <code>Bundle</code> containing the data values for the - * <code>Workflow</code> - * @param referenceService - * the <code>ReferenceService</code> used to register inputs, - * outputs and intermediate values - * @throws InvalidWorkflowException - * if the specified workflow is invalid - */ - public LocalExecution(WorkflowBundle workflowBundle, Workflow workflow, - Profile profile, Bundle dataBundle, - ReferenceService referenceService, Edits edits, - ActivityService activityService, - DispatchLayerService dispatchLayerService) - throws InvalidWorkflowException { - super(workflowBundle, workflow, profile, dataBundle); - this.referenceService = referenceService; - try { - mapping = new WorkflowToDataflowMapper(workflowBundle, profile, - edits, activityService, dispatchLayerService); - Dataflow dataflow = mapping.getDataflow(workflow); - for (DataflowInputPort dataflowInputPort : dataflow.getInputPorts()) - inputPorts.put(dataflowInputPort.getName(), dataflowInputPort); - facade = edits.createWorkflowInstanceFacade(dataflow, - createContext(), ""); - executionMonitor = new LocalExecutionMonitor(getWorkflowReport(), - getDataBundle(), mapping, facade.getIdentifier()); - } catch (InvalidDataflowException e) { - throw new InvalidWorkflowException(e); - } - } - - @Override - public void delete() { - cancel(); - } - - @Override - public void start() { - MonitorManager.getInstance().addObserver(executionMonitor); - /* - * have to add a result listener otherwise facade doesn't record when - * workflow is finished - */ - facade.addResultListener(this); - facade.fire(); - try { - if (DataBundles.hasInputs(getDataBundle())) { - Path inputs = DataBundles.getInputs(getDataBundle()); - for (Entry<String, DataflowInputPort> inputPort : inputPorts - .entrySet()) { - String portName = inputPort.getKey(); - Path path = DataBundles.getPort(inputs, portName); - if (!DataBundles.isMissing(path)) { - T2Reference identifier = referenceService.register( - convertPathToObject(path), inputPort.getValue() - .getDepth(), true, null); - int[] index = new int[] {}; - WorkflowDataToken token = new WorkflowDataToken("", - index, identifier, facade.getContext()); - try { - facade.pushData(token, portName); - } catch (TokenOrderException e) { - logger.log(SEVERE, "Unable to push data for input " - + portName, e); - } - } - } - } - } catch (IOException e) { - logger.log(SEVERE, "Error getting input data", e); - } - } - - @Override - public void pause() { - facade.pauseWorkflowRun(); - } - - @Override - public void resume() { - facade.resumeWorkflowRun(); - } - - @Override - public void cancel() { - facade.cancelWorkflowRun(); - facade.removeResultListener(this); - MonitorManager.getInstance().removeObserver(executionMonitor); - } - - @Override - protected WorkflowReport createWorkflowReport(Workflow workflow) { - return new WorkflowReport(workflow); - } - - @Override - public ProcessorReport createProcessorReport( - uk.org.taverna.scufl2.api.core.Processor processor) { - return new LocalProcessorReport(processor); - } - - @Override - public ActivityReport createActivityReport( - uk.org.taverna.scufl2.api.activity.Activity activity) { - return new ActivityReport(activity); - } - - private InvocationContext createContext() { - InvocationContext context = new InvocationContext() { - private List<Object> entities = Collections - .synchronizedList(new ArrayList<Object>()); - - @Override - public <T> List<T> getEntities(Class<T> entityType) { - List<T> entitiesOfType = new ArrayList<>(); - synchronized (entities) { - for (Object entity : entities) - if (entityType.isInstance(entity)) - entitiesOfType.add(entityType.cast(entity)); - } - return entitiesOfType; - } - - @Override - public void addEntity(Object entity) { - entities.add(entity); - } - - @Override - public ReferenceService getReferenceService() { - return referenceService; - } - - @Override - public ProvenanceReporter getProvenanceReporter() { - return null; - } - - }; - return context; - } - - @Override - public void resultTokenProduced(WorkflowDataToken token, String portName) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java deleted file mode 100644 index 674dcd1..0000000 --- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java +++ /dev/null @@ -1,86 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2011 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package uk.org.taverna.platform.execution.impl.local; - -import java.net.URI; -import java.util.Set; - -import uk.org.taverna.platform.capability.api.ActivityConfigurationException; -import uk.org.taverna.platform.capability.api.ActivityNotFoundException; -import uk.org.taverna.platform.capability.api.ActivityService; -import uk.org.taverna.platform.capability.api.DispatchLayerConfigurationException; -import uk.org.taverna.platform.capability.api.DispatchLayerNotFoundException; -import uk.org.taverna.platform.capability.api.DispatchLayerService; -import uk.org.taverna.platform.execution.api.AbstractExecutionEnvironment; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * Execution Environment for a local Taverna Dataflow Engine - * - * @author David Withers - */ -public class LocalExecutionEnvironment extends AbstractExecutionEnvironment { - - private final ActivityService activityService; - private final DispatchLayerService dispatchLayerService; - - public LocalExecutionEnvironment(LocalExecutionService localExecutionService, - ActivityService activityService, DispatchLayerService dispatchLayerService) { - super(LocalExecutionEnvironment.class.getName(), "Taverna Local Execution Environment", - "Execution Environment for a local Taverna Dataflow Engine", localExecutionService); - this.activityService = activityService; - this.dispatchLayerService = dispatchLayerService; - } - - @Override - public Set<URI> getActivityTypes() { - return activityService.getActivityTypes(); - } - - @Override - public boolean activityExists(URI uri) { - return activityService.activityExists(uri); - } - - @Override - public JsonNode getActivityConfigurationSchema(URI uri) - throws ActivityNotFoundException, ActivityConfigurationException { - return activityService.getActivityConfigurationSchema(uri); - } - - @Override - public Set<URI> getDispatchLayerTypes() { - return dispatchLayerService.getDispatchLayerTypes(); - } - - @Override - public boolean dispatchLayerExists(URI uri) { - return dispatchLayerService.dispatchLayerExists(uri); - } - - @Override - public JsonNode getDispatchLayerConfigurationSchema(URI uri) - throws DispatchLayerNotFoundException, DispatchLayerConfigurationException { - return dispatchLayerService.getDispatchLayerConfigurationSchema(uri); - } - -}
