Resolve some generics warnings with some fancier footwork
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2820534a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2820534a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2820534a Branch: refs/heads/master Commit: 2820534a54fe77bf8220da9bb0cd1d4ce6c8b2b3 Parents: 5069eed Author: Sean Owen <so...@cloudera.com> Authored: Fri Jul 3 18:38:02 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:14 2016 +0000 ---------------------------------------------------------------------- runners/spark/README.md | 2 +- runners/spark/pom.xml | 2 +- .../com/cloudera/dataflow/hadoop/HadoopIO.java | 17 +++++++++-------- .../cloudera/dataflow/spark/EvaluationContext.java | 2 +- .../dataflow/spark/SparkPipelineRunner.java | 13 +++++++------ .../dataflow/spark/SparkRuntimeContext.java | 7 ++++--- .../dataflow/spark/TransformEvaluator.java | 2 +- .../dataflow/spark/TransformTranslator.java | 6 +++--- .../spark/HadoopFileFormatPipelineTest.java | 15 ++++++++++----- 9 files changed, 37 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/README.md ---------------------------------------------------------------------- diff --git a/runners/spark/README.md b/runners/spark/README.md index 52f7f9b..d93f554 100644 --- a/runners/spark/README.md +++ b/runners/spark/README.md @@ -66,7 +66,7 @@ First download a text document to use as input: Then run the [word count example][wc] from the SDK using a single threaded Spark instance in local mode: - mvn exec:exec -Dclass=com.google.cloud.dataflow.examples.WordCount \ + mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \ -DsparkMaster=local http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 1ab37b7..0382108 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -194,7 +194,7 @@ License. <arguments> <argument>-classpath</argument> <classpath /> - <argument>${class}</argument> + <argument>${mainClass}</argument> <argument>--input=${input}</argument> <argument>--output=${output}</argument> <argument>--runner=${runner}</argument> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java index 434a132..5873b9f 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java @@ -32,20 +32,21 @@ public final class HadoopIO { private Read() { } - public static Bound from(String filepattern) { - return new Bound().from(filepattern); + public static <K, V> Bound<K, V> from(String filepattern) { + return new Bound<K, V>().from(filepattern); } - public static Bound withFormatClass(Class<? extends FileInputFormat<?, ?>> format) { - return new Bound().withFormatClass(format); + public static <K, V> Bound<K, V> withFormatClass( + Class<? extends FileInputFormat<K, V>> format) { + return new Bound<K, V>().withFormatClass(format); } - public static Bound withKeyClass(Class<?> key) { - return new Bound().withKeyClass(key); + public static <K, V> Bound<K, V> withKeyClass(Class<K> key) { + return new Bound<K, V>().withKeyClass(key); } - public static Bound withValueClass(Class<?> value) { - return new Bound().withValueClass(value); + public static <K, V> Bound<K, V> withValueClass(Class<V> value) { + return new Bound<K, V>().withValueClass(value); } public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java index 7906775..56f8521 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java @@ -127,7 +127,7 @@ public class EvaluationContext implements EvaluationResult { leafRdds.add(rdd); } - JavaRDDLike<?, ?> getInputRDD(PTransform transform) { + JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) { return getRDD((PValue) getInput(transform)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java index 36685c3..792888d 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java @@ -183,15 +183,16 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> doVisitTransform(node); } - private <PT extends PTransform> void doVisitTransform(TransformTreeNode node) { + private <PT extends PTransform<? super PInput, POutput>> + void doVisitTransform(TransformTreeNode node) { + @SuppressWarnings("unchecked") PT transform = (PT) node.getTransform(); @SuppressWarnings("unchecked") - TransformEvaluator<PT> evaluator = (TransformEvaluator<PT>) - TransformTranslator.getTransformEvaluator(transform.getClass()); + Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); + TransformEvaluator<PT> evaluator = TransformTranslator.getTransformEvaluator(transformClass); LOG.info("Evaluating {}", transform); - AppliedPTransform<PInput, POutput, ? extends PTransform> appliedTransform = - AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), - (PTransform) transform); + AppliedPTransform<PInput, POutput, PT> appliedTransform = + AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform); ctxt.setCurrentTransform(appliedTransform); evaluator.evaluate(transform, ctxt); ctxt.setCurrentTransform(null); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java index 51db39b..ec590a9 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java @@ -94,8 +94,9 @@ public class SparkRuntimeContext implements Serializable { } public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) { - final T aggregatorValue = (T) getAggregatorValue(aggregator.getName(), - aggregator.getCombineFn().getOutputType().getRawType()); + @SuppressWarnings("unchecked") + Class<T> aggValueClass = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType(); + final T aggregatorValue = getAggregatorValue(aggregator.getName(), aggValueClass); return new AggregatorValues<T>() { @Override public Collection<T> getValues() { @@ -150,7 +151,7 @@ public class SparkRuntimeContext implements Serializable { return coderRegistry; } - private Coder getCoder(Combine.CombineFn<?, ?, ?> combiner) { + private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) { try { if (combiner.getClass() == Sum.SumIntegerFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java index bf45d12..52842d5 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java @@ -19,6 +19,6 @@ import java.io.Serializable; import com.google.cloud.dataflow.sdk.transforms.PTransform; -public interface TransformEvaluator<PT extends PTransform> extends Serializable { +public interface TransformEvaluator<PT extends PTransform<?, ?>> extends Serializable { void evaluate(PT transform, EvaluationContext context); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index 2689424..b0fd4a3 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -633,12 +633,12 @@ public final class TransformTranslator { EVALUATORS.put(Window.Bound.class, window()); } - public static <PT extends PTransform> boolean hasTransformEvaluator(Class<PT> clazz) { + public static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) { return EVALUATORS.containsKey(clazz); } - public static <PT extends PTransform> TransformEvaluator<PT> getTransformEvaluator(Class<PT> - clazz) { + public static <PT extends PTransform<?, ?>> TransformEvaluator<PT> getTransformEvaluator(Class<PT> + clazz) { @SuppressWarnings("unchecked") TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); if (transform == null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java index 9aa634e..1fd8e41 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.Before; import org.junit.Rule; @@ -63,11 +64,15 @@ public class HadoopFileFormatPipelineTest { populateFile(); Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<KV<IntWritable, Text>> input = (PCollection<KV<IntWritable, Text>>) - p.apply(HadoopIO.Read.from(inputFile.getAbsolutePath()) - .withFormatClass(SequenceFileInputFormat.class) - .withKeyClass(IntWritable.class) - .withValueClass(Text.class)); + @SuppressWarnings("unchecked") + Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = + (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; + HadoopIO.Read.Bound<IntWritable,Text> bound = + HadoopIO.Read.<IntWritable,Text>from(inputFile.getAbsolutePath()) + .withKeyClass(IntWritable.class) + .withValueClass(Text.class) + .withFormatClass(inputFormatClass); + PCollection<KV<IntWritable, Text>> input = p.apply(bound); input.apply(ParDo.of(new TabSeparatedString())) .apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding()); EvaluationResult res = SparkPipelineRunner.create().run(p);