Repository: crunch Updated Branches: refs/heads/master 7f54a0e22 -> 2d20e7772
CRUNCH-425: Implement the Converter#applyPTypeTransforms() logic for Crunch-on-Spark. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2d20e777 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2d20e777 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2d20e777 Branch: refs/heads/master Commit: 2d20e7772beee8f588431c6767b411895be94f6c Parents: 7f54a0e Author: Josh Wills <[email protected]> Authored: Sun Aug 24 10:06:47 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Aug 24 12:39:35 2014 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/SkipPTypesIT.java | 116 +++++++++++++++++++ .../apache/crunch/impl/spark/SparkRuntime.java | 6 +- .../impl/spark/collect/InputCollection.java | 11 +- .../crunch/impl/spark/collect/InputTable.java | 18 ++- .../impl/spark/fn/InputConverterFunction.java | 2 +- .../crunch/impl/spark/fn/PairMapFunction.java | 2 +- 6 files changed, 145 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java b/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java new file mode 100644 index 0000000..609d975 --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.seq.SeqFileTableSourceTarget; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SkipPTypesIT { + @Rule + public TemporaryPath tempDir = new TemporaryPath(); + + PTableType<Text, LongWritable> ptt = Writables.tableOf(Writables.writables(Text.class), + Writables.writables(LongWritable.class)); + + @Test + public void testSkipPTypes() throws Exception { + String out = tempDir.getFileName("out"); + SparkPipeline pipeline = new SparkPipeline("local", "skipptypes"); + PCollection<String> shakes = pipeline.read(From.textFile(tempDir.copyResourceFileName("shakes.txt"))); + PTable<String, Long> wcnt = shakes.count(); + wcnt.write(new MySeqFileTableSourceTarget(out, ptt)); + pipeline.run(); + + PTable<Text, LongWritable> wcntIn = pipeline.read(new MySeqFileTableSourceTarget(out, ptt)); + assertEquals(new LongWritable(1L), wcntIn.materialize().iterator().next().second()); + pipeline.done(); + } + + static class ToWritables extends MapFn<Pair<String, Long>, Pair<Text, LongWritable>> { + @Override + public Pair<Text, LongWritable> map(Pair<String, Long> input) { + return Pair.of(new Text(input.first()), new LongWritable(input.second())); + } + } + static class MySeqFileTableSourceTarget extends SeqFileTableSourceTarget { + + public MySeqFileTableSourceTarget(String path, PTableType ptype) { + super(path, ptype); + } + + @Override + public Converter getConverter() { + return new SkipPTypesConverter(getType().getConverter()); + } + } + + static class SkipPTypesConverter implements Converter { + + private Converter delegate; + + public SkipPTypesConverter(Converter delegate) { + this.delegate = delegate; + } + + @Override + public Object convertInput(Object key, Object value) { + return delegate.convertInput(key, value); + } + + @Override + public Object convertIterableInput(Object key, Iterable value) { + return delegate.convertIterableInput(key, value); + } + + @Override + public Object outputKey(Object value) { + return delegate.outputKey(value); + } + + @Override + public Object outputValue(Object value) { + return delegate.outputValue(value); + } + + @Override + public Class getKeyClass() { + return delegate.getKeyClass(); + } + + @Override + public Class getValueClass() { + return delegate.getValueClass(); + } + + @Override + public boolean applyPTypeTransforms() { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index a9537e5..b5bbc8d 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -33,6 +33,7 @@ import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.spark.fn.MapFunction; import org.apache.crunch.impl.spark.fn.OutputConverterFunction; @@ -299,14 +300,15 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe getRuntimeContext().setConf(sparkContext.broadcast(WritableUtils.toByteArray(conf))); if (t instanceof MapReduceTarget) { //TODO: check this earlier Converter c = t.getConverter(ptype); + IdentityFn ident = IdentityFn.getInstance(); JavaPairRDD<?, ?> outRDD; if (rdd instanceof JavaRDD) { outRDD = ((JavaRDD) rdd) - .map(new MapFunction(ptype.getOutputMapFn(), ctxt)) + .map(new MapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt)) .map(new OutputConverterFunction(c)); } else { outRDD = ((JavaPairRDD) rdd) - .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt)) + .map(new PairMapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt)) .map(new OutputConverterFunction(c)); } try { http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java index 237c8de..8273236 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java @@ -17,8 +17,10 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.MapFn; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Source; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.BaseInputCollection; import org.apache.crunch.impl.mr.run.CrunchInputFormat; @@ -26,6 +28,7 @@ import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; import org.apache.crunch.impl.spark.fn.InputConverterFunction; import org.apache.crunch.impl.spark.fn.MapFunction; +import org.apache.crunch.types.Converter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.spark.api.java.JavaPairRDD; @@ -44,15 +47,17 @@ public class InputCollection<S> extends BaseInputCollection<S> implements SparkC Job job = new Job(runtime.getConfiguration()); FileInputFormat.addInputPaths(job, "/tmp"); //placeholder source.configureSource(job, -1); + Converter converter = source.getConverter(); JavaPairRDD<?, ?> input = runtime.getSparkContext().newAPIHadoopRDD( job.getConfiguration(), CrunchInputFormat.class, - source.getConverter().getKeyClass(), - source.getConverter().getValueClass()); + converter.getKeyClass(), + converter.getValueClass()); input.rdd().setName(source.toString()); + MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance(); return input .map(new InputConverterFunction(source.getConverter())) - .map(new MapFunction(source.getType().getInputMapFn(), runtime.getRuntimeContext())); + .map(new MapFunction(mapFn, runtime.getRuntimeContext())); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java index 606ffce..e83d912 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java @@ -17,14 +17,20 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.MapFn; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.TableSource; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.BaseInputTable; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.impl.spark.fn.InputConverterFunction; +import org.apache.crunch.impl.spark.fn.PairMapFunction; +import org.apache.crunch.types.Converter; import org.apache.hadoop.mapreduce.Job; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDDLike; import java.io.IOException; @@ -40,11 +46,17 @@ public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkColle try { Job job = new Job(runtime.getConfiguration()); source.configureSource(job, -1); // TODO: a custom input format for crunch-spark - return runtime.getSparkContext().newAPIHadoopRDD( + Converter converter = source.getConverter(); + JavaPairRDD<?, ?> input = runtime.getSparkContext().newAPIHadoopRDD( job.getConfiguration(), CrunchInputFormat.class, - source.getConverter().getKeyClass(), - source.getConverter().getValueClass()); + converter.getKeyClass(), + converter.getValueClass()); + input.rdd().setName(source.toString()); + MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance(); + return input + .map(new InputConverterFunction(source.getConverter())) + .map(new PairMapFunction(mapFn, runtime.getRuntimeContext())); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java index 52869a4..36745c1 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java @@ -30,6 +30,6 @@ public class InputConverterFunction<K, V, S> extends Function<Tuple2<K, V>, S> { @Override public S call(Tuple2<K, V> kv) throws Exception { - return converter.convertInput(kv._1, kv._2); + return converter.convertInput(kv._1(), kv._2()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java index 6db30f0..673bbab 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java @@ -39,6 +39,6 @@ public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> { ctxt.initialize(fn); initialized = true; } - return fn.map(Pair.of(kv._1, kv._2)); + return fn.map(Pair.of(kv._1(), kv._2())); } }
