Resolve some generics warnings with some fancier footwork

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2820534a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2820534a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2820534a

Branch: refs/heads/master
Commit: 2820534a54fe77bf8220da9bb0cd1d4ce6c8b2b3
Parents: 5069eed
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Jul 3 18:38:02 2015 +0100
Committer: Tom White <t...@cloudera.com>
Committed: Thu Mar 10 11:15:14 2016 +0000

----------------------------------------------------------------------
 runners/spark/README.md                            |  2 +-
 runners/spark/pom.xml                              |  2 +-
 .../com/cloudera/dataflow/hadoop/HadoopIO.java     | 17 +++++++++--------
 .../cloudera/dataflow/spark/EvaluationContext.java |  2 +-
 .../dataflow/spark/SparkPipelineRunner.java        | 13 +++++++------
 .../dataflow/spark/SparkRuntimeContext.java        |  7 ++++---
 .../dataflow/spark/TransformEvaluator.java         |  2 +-
 .../dataflow/spark/TransformTranslator.java        |  6 +++---
 .../spark/HadoopFileFormatPipelineTest.java        | 15 ++++++++++-----
 9 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index 52f7f9b..d93f554 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -66,7 +66,7 @@ First download a text document to use as input:
 Then run the [word count example][wc] from the SDK using a single threaded 
Spark instance
 in local mode:
 
-    mvn exec:exec -Dclass=com.google.cloud.dataflow.examples.WordCount \
+    mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \
       -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner 
\
       -DsparkMaster=local
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 1ab37b7..0382108 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -194,7 +194,7 @@ License.
                         <arguments>
                             <argument>-classpath</argument>
                             <classpath />
-                            <argument>${class}</argument>
+                            <argument>${mainClass}</argument>
                             <argument>--input=${input}</argument>
                             <argument>--output=${output}</argument>
                             <argument>--runner=${runner}</argument>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
index 434a132..5873b9f 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
@@ -32,20 +32,21 @@ public final class HadoopIO {
     private Read() {
     }
 
-    public static Bound from(String filepattern) {
-      return new Bound().from(filepattern);
+    public static <K, V> Bound<K, V> from(String filepattern) {
+      return new Bound<K, V>().from(filepattern);
     }
 
-    public static Bound withFormatClass(Class<? extends FileInputFormat<?, ?>> 
format) {
-      return new Bound().withFormatClass(format);
+    public static <K, V> Bound<K, V> withFormatClass(
+        Class<? extends FileInputFormat<K, V>> format) {
+      return new Bound<K, V>().withFormatClass(format);
     }
 
-    public static Bound withKeyClass(Class<?> key) {
-      return new Bound().withKeyClass(key);
+    public static <K, V> Bound<K, V> withKeyClass(Class<K> key) {
+      return new Bound<K, V>().withKeyClass(key);
     }
 
-    public static Bound withValueClass(Class<?> value) {
-      return new Bound().withValueClass(value);
+    public static <K, V> Bound<K, V> withValueClass(Class<V> value) {
+      return new Bound<K, V>().withValueClass(value);
     }
 
     public static class Bound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
index 7906775..56f8521 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -127,7 +127,7 @@ public class EvaluationContext implements EvaluationResult {
     leafRdds.add(rdd);
   }
 
-  JavaRDDLike<?, ?> getInputRDD(PTransform transform) {
+  JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
     return getRDD((PValue) getInput(transform));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
index 36685c3..792888d 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
@@ -183,15 +183,16 @@ public final class SparkPipelineRunner extends 
PipelineRunner<EvaluationResult>
       doVisitTransform(node);
     }
 
-    private <PT extends PTransform> void doVisitTransform(TransformTreeNode 
node) {
+    private <PT extends PTransform<? super PInput, POutput>>
+        void doVisitTransform(TransformTreeNode node) {
+      @SuppressWarnings("unchecked")
       PT transform = (PT) node.getTransform();
       @SuppressWarnings("unchecked")
-      TransformEvaluator<PT> evaluator = (TransformEvaluator<PT>)
-          TransformTranslator.getTransformEvaluator(transform.getClass());
+      Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+      TransformEvaluator<PT> evaluator = 
TransformTranslator.getTransformEvaluator(transformClass);
       LOG.info("Evaluating {}", transform);
-      AppliedPTransform<PInput, POutput, ? extends PTransform> 
appliedTransform =
-          AppliedPTransform.of(node.getFullName(), node.getInput(), 
node.getOutput(),
-              (PTransform) transform);
+      AppliedPTransform<PInput, POutput, PT> appliedTransform =
+          AppliedPTransform.of(node.getFullName(), node.getInput(), 
node.getOutput(), transform);
       ctxt.setCurrentTransform(appliedTransform);
       evaluator.evaluate(transform, ctxt);
       ctxt.setCurrentTransform(null);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
index 51db39b..ec590a9 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
@@ -94,8 +94,9 @@ public class SparkRuntimeContext implements Serializable {
   }
 
   public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> 
aggregator) {
-    final T aggregatorValue = (T) getAggregatorValue(aggregator.getName(),
-        aggregator.getCombineFn().getOutputType().getRawType());
+    @SuppressWarnings("unchecked")
+    Class<T> aggValueClass = (Class<T>) 
aggregator.getCombineFn().getOutputType().getRawType();
+    final T aggregatorValue = getAggregatorValue(aggregator.getName(), 
aggValueClass);
     return new AggregatorValues<T>() {
       @Override
       public Collection<T> getValues() {
@@ -150,7 +151,7 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
-  private Coder getCoder(Combine.CombineFn<?, ?, ?> combiner) {
+  private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
     try {
       if (combiner.getClass() == Sum.SumIntegerFn.class) {
         return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
index bf45d12..52842d5 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
@@ -19,6 +19,6 @@ import java.io.Serializable;
 
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 
-public interface TransformEvaluator<PT extends PTransform> extends 
Serializable {
+public interface TransformEvaluator<PT extends PTransform<?, ?>> extends 
Serializable {
   void evaluate(PT transform, EvaluationContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 2689424..b0fd4a3 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -633,12 +633,12 @@ public final class TransformTranslator {
     EVALUATORS.put(Window.Bound.class, window());
   }
 
-  public static <PT extends PTransform> boolean 
hasTransformEvaluator(Class<PT> clazz) {
+  public static <PT extends PTransform<?, ?>> boolean 
hasTransformEvaluator(Class<PT> clazz) {
     return EVALUATORS.containsKey(clazz);
   }
 
-  public static <PT extends PTransform> TransformEvaluator<PT> 
getTransformEvaluator(Class<PT>
-                                                                               
          clazz) {
+  public static <PT extends PTransform<?, ?>> TransformEvaluator<PT> 
getTransformEvaluator(Class<PT>
+                                                                               
            clazz) {
     @SuppressWarnings("unchecked")
     TransformEvaluator<PT> transform = (TransformEvaluator<PT>) 
EVALUATORS.get(clazz);
     if (transform == null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2820534a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
index 9aa634e..1fd8e41 100644
--- 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.junit.Before;
 import org.junit.Rule;
@@ -63,11 +64,15 @@ public class HadoopFileFormatPipelineTest {
     populateFile();
 
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<KV<IntWritable, Text>> input = (PCollection<KV<IntWritable, 
Text>>)
-            p.apply(HadoopIO.Read.from(inputFile.getAbsolutePath())
-            .withFormatClass(SequenceFileInputFormat.class)
-            .withKeyClass(IntWritable.class)
-            .withValueClass(Text.class));
+    @SuppressWarnings("unchecked")
+    Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
+        (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) 
SequenceFileInputFormat.class;
+    HadoopIO.Read.Bound<IntWritable,Text> bound =
+        HadoopIO.Read.<IntWritable,Text>from(inputFile.getAbsolutePath())
+        .withKeyClass(IntWritable.class)
+        .withValueClass(Text.class)
+        .withFormatClass(inputFormatClass);
+    PCollection<KV<IntWritable, Text>> input = p.apply(bound);
     input.apply(ParDo.of(new TabSeparatedString()))
         
.apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
     EvaluationResult res = SparkPipelineRunner.create().run(p);

Reply via email to