Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 b6791f86a -> a5c592768
http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java index 0d1d5e0..237c8de 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java @@ -17,8 +17,8 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Source; -import org.apache.crunch.SourceTarget; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.BaseInputCollection; import org.apache.crunch.impl.mr.run.CrunchInputFormat; @@ -26,7 +26,6 @@ import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; import org.apache.crunch.impl.spark.fn.InputConverterFunction; import org.apache.crunch.impl.spark.fn.MapFunction; -import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.spark.api.java.JavaPairRDD; @@ -36,8 +35,8 @@ import java.io.IOException; public class InputCollection<S> extends BaseInputCollection<S> implements SparkCollection { - InputCollection(Source<S> source, DistributedPipeline pipeline) { - super(source, pipeline); + InputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source, pipeline, doOpts); } public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java index 7e8471c..606ffce 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.TableSource; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.BaseInputTable; @@ -30,8 +31,8 @@ import java.io.IOException; public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkCollection { - public InputTable(TableSource<K, V> source, DistributedPipeline pipeline) { - super(source, pipeline); + public InputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java index 389d91c..1421945 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java @@ -43,13 +43,19 @@ import java.util.List; public class SparkCollectFactory implements PCollectionFactory { @Override - public <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline pipeline) { - return new InputCollection<S>(source, pipeline); + public <S> BaseInputCollection<S> createInputCollection( + Source<S> source, + DistributedPipeline pipeline, + ParallelDoOptions doOpts) { + return new InputCollection<S>(source, pipeline, doOpts); } @Override - public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V> source, DistributedPipeline pipeline) { - return new InputTable<K, V>(source, pipeline); + public <K, V> BaseInputTable<K, V> createInputTable( + TableSource<K, V> source, + DistributedPipeline pipeline, + ParallelDoOptions doOpts) { + return new InputTable<K, V>(source, pipeline, doOpts); } @Override
