Repository: crunch Updated Branches: refs/heads/master 9f4193163 -> 6e3560651
CRUNCH-509: Add support for 'named' targets in Crunch-on-Spark Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e356065 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e356065 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e356065 Branch: refs/heads/master Commit: 6e3560651ed3a0bd47cab2ad0ec1c808e599e050 Parents: 9f41931 Author: Josh Wills <[email protected]> Authored: Tue May 5 23:54:29 2015 +0100 Committer: Josh Wills <[email protected]> Committed: Fri May 8 22:43:02 2015 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/io/avro/AvroFileTarget.java | 8 +------- .../apache/crunch/io/avro/AvroPathPerKeyTarget.java | 8 +------- .../apache/crunch/types/avro/AvroOutputFormat.java | 9 +-------- .../org/apache/crunch/impl/spark/SparkRuntime.java | 16 ++++++++++++++-- 4 files changed, 17 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index ed8d7b7..37aa05b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -76,16 +76,10 @@ public class AvroFileTarget extends FileTargetImpl { public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { AvroType<?> atype = (AvroType<?>) ptype; FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class); - String schemaParam = null; - if (name == null) { - schemaParam = "avro.output.schema"; - } else { - schemaParam = "avro.output.schema." + name; - } for (Map.Entry<String, String> e : extraConf.entrySet()) { bundle.set(e.getKey(), e.getValue()); } - bundle.set(schemaParam, atype.getSchema().toString()); + bundle.set("avro.output.schema", atype.getSchema().toString()); AvroMode.fromType(atype).configure(bundle); configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name); http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java index 76a2cf7..336b940 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java @@ -78,13 +78,7 @@ public class AvroPathPerKeyTarget extends FileTargetImpl { public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType(); FormatBundle bundle = FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class); - String schemaParam; - if (name == null) { - schemaParam = "avro.output.schema"; - } else { - schemaParam = "avro.output.schema." + name; - } - bundle.set(schemaParam, atype.getSchema().toString()); + bundle.set("avro.output.schema", atype.getSchema().toString()); AvroMode.fromType(atype).configure(bundle); configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name); } http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java index 79736b8..30a7399 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java @@ -37,14 +37,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> { public static <S> DataFileWriter<S> getDataFileWriter(Path path, Configuration conf) throws IOException { - Schema schema = null; - String outputName = conf.get("crunch.namedoutput"); - if (outputName != null && !outputName.isEmpty()) { - schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName)); - } else { - schema = AvroJob.getOutputSchema(conf); - } - + Schema schema = AvroJob.getOutputSchema(conf); DataFileWriter<S> writer = new DataFileWriter<S>(AvroMode.fromConfiguration(conf).<S>getWriter(schema)); JobConf jc = new JobConf(conf); http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 3b5b419..4c0cb27 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -38,6 +38,7 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.spark.fn.MapFunction; import org.apache.crunch.impl.spark.fn.OutputConverterFunction; import org.apache.crunch.impl.spark.fn.PairMapFunction; +import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.MapReduceTarget; import org.apache.crunch.io.PathTarget; import org.apache.crunch.materialize.MaterializableIterable; @@ -323,7 +324,13 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe Job job = new Job(conf); if (t instanceof PathTarget) { PathTarget pt = (PathTarget) t; - pt.configureForMapReduce(job, ptype, pt.getPath(), null); + pt.configureForMapReduce(job, ptype, pt.getPath(), "out0"); + CrunchOutputs.OutputConfig outConfig = + CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0"); + job.setOutputFormatClass(outConfig.bundle.getFormatClass()); + job.setOutputKeyClass(outConfig.keyClass); + job.setOutputValueClass(outConfig.valueClass); + outConfig.bundle.configure(job.getConfiguration()); Path tmpPath = pipeline.createTempPath(); outRDD.saveAsNewAPIHadoopFile( tmpPath.toString(), @@ -334,7 +341,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe pt.handleOutputs(job.getConfiguration(), tmpPath, -1); } else if (t instanceof MapReduceTarget) { MapReduceTarget mrt = (MapReduceTarget) t; - mrt.configureForMapReduce(job, ptype, new Path("/tmp"), null); + mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0"); + CrunchOutputs.OutputConfig outConfig = + CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0"); + job.setOutputFormatClass(outConfig.bundle.getFormatClass()); + job.setOutputKeyClass(outConfig.keyClass); + job.setOutputValueClass(outConfig.valueClass); outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration())); } else { throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);
