Repository: incubator-beam
Updated Branches:
  refs/heads/master 03b89c065 -> 2492604e4


Improve compile-time type checking

* Enables -Xlint:rawtypes except for Spark and Flink runners and the
  microbenchmarks module

* Fixed some warnings in tests

* Fixed checkstyle warnings


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3e6fe3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3e6fe3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3e6fe3f

Branch: refs/heads/master
Commit: e3e6fe3fc4cf462f84271acc8447798760ae770f
Parents: 03b89c0
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Sep 29 14:00:41 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Fri Oct 7 09:49:35 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  1 +
 .../direct/ImmutableListBundleFactory.java      |  2 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  7 +-
 .../beam/runners/direct/StructuralKey.java      |  2 +-
 .../direct/TestStreamEvaluatorFactory.java      |  4 +-
 .../beam/runners/direct/WatermarkManager.java   |  2 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 26 ++-----
 .../runners/dataflow/internal/IsmFormat.java    |  5 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  2 +-
 .../beam/sdk/options/PipelineOptions.java       |  9 ++-
 .../org/apache/beam/sdk/testing/TestStream.java |  2 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |  3 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    |  8 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  4 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 11 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      |  1 +
 .../sdk/options/PipelineOptionsFactoryTest.java |  6 +-
 .../beam/sdk/testing/TestPipelineTest.java      |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java | 77 ++++++++------------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../display/DisplayDataEvaluator.java           |  6 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  2 +-
 .../beam/sdk/io/hdfs/AvroHDFSFileSource.java    |  2 +-
 .../beam/sdk/io/hdfs/AvroWrapperCoder.java      |  2 +-
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  7 +-
 .../simpleauth/SimpleAuthHDFSFileSource.java    |  5 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  3 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     |  5 +-
 28 files changed, 93 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 67ec3e6..abcc57b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -280,6 +280,7 @@ public class DirectRunner
     return result;
   }
 
+  @SuppressWarnings("rawtypes")
   private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
       defaultModelEnforcements(DirectOptions options) {
     ImmutableMap.Builder<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 53b7e54..4972340 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -111,7 +111,7 @@ class ImmutableListBundleFactory implements BundleFactory {
         StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
         Instant synchronizedCompletionTime) {
-      return new 
AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle(
+      return new 
AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>(
           pcollection, key, committedElements, synchronizedCompletionTime);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a761289..bbe7827 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -160,15 +160,14 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
       undeclaredOutputs = new HashMap<>();
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      @SuppressWarnings("rawtypes")
       UncommittedBundle bundle = bundles.get(tag);
       if (bundle == null) {
-        List undeclaredContents = undeclaredOutputs.get(tag);
+        List<WindowedValue<T>> undeclaredContents = (List) 
undeclaredOutputs.get(tag);
         if (undeclaredContents == null) {
-          undeclaredContents = new ArrayList<T>();
+          undeclaredContents = new ArrayList<>();
           undeclaredOutputs.put(tag, undeclaredContents);
         }
         undeclaredContents.add(output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
index 61332f9..0d0f8e7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
@@ -89,7 +89,7 @@ abstract class StructuralKey<K> {
         return true;
       }
       if (other instanceof CoderStructuralKey) {
-        CoderStructuralKey that = (CoderStructuralKey) other;
+        CoderStructuralKey<?> that = (CoderStructuralKey<?>) other;
         return structuralValue.equals(that.structuralValue);
       }
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index ffb4fb5..4a48a58 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -171,13 +171,13 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
     private static class DirectTestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
       private final TestStream<T> original;
 
-      private DirectTestStream(TestStream transform) {
+      private DirectTestStream(TestStream<T> transform) {
         this.original = transform;
       }
 
       @Override
       public PCollection<T> apply(PBegin input) {
-        PipelineRunner runner = input.getPipeline().getRunner();
+        PipelineRunner<?> runner = input.getPipeline().getRunner();
         checkState(
             runner instanceof DirectRunner,
             "%s can only be used when running with the %s",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b3d1fc5..21cb427 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -859,7 +859,7 @@ public class WatermarkManager {
 
   private void applyPendingUpdate(PendingWatermarkUpdate pending) {
     CommittedResult result = pending.getResult();
-    AppliedPTransform transform = result.getTransform();
+    AppliedPTransform<?, ?, ?> transform = result.getTransform();
     CommittedBundle<?> inputBundle = pending.getInputBundle();
 
     updatePending(inputBundle, pending.getTimerUpdate(), result);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ceaf6a0..64ac3ad 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2081,24 +2081,17 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     static {
       DataflowPipelineTranslator.registerTransformTranslator(
-          StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
+          StreamingPubsubIORead.class, new 
StreamingPubsubIOReadTranslator<>());
     }
   }
 
   /**
    * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
    */
-  private static class StreamingPubsubIOReadTranslator implements
-      TransformTranslator<StreamingPubsubIORead> {
+  private static class StreamingPubsubIOReadTranslator<T> implements
+      TransformTranslator<StreamingPubsubIORead<T>> {
     @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
     public void translate(
-        StreamingPubsubIORead transform,
-        TranslationContext context) {
-      translateTyped(transform, context);
-    }
-
-    private <T> void translateTyped(
         StreamingPubsubIORead<T> transform,
         TranslationContext context) {
       checkArgument(context.getPipelineOptions().isStreaming(),
@@ -2157,25 +2150,18 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     static {
       DataflowPipelineTranslator.registerTransformTranslator(
-          StreamingPubsubIOWrite.class, new 
StreamingPubsubIOWriteTranslator());
+          StreamingPubsubIOWrite.class, new 
StreamingPubsubIOWriteTranslator<>());
     }
   }
 
   /**
    * Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
    */
-  private static class StreamingPubsubIOWriteTranslator implements
-      TransformTranslator<StreamingPubsubIOWrite> {
+  private static class StreamingPubsubIOWriteTranslator<T> implements
+      TransformTranslator<StreamingPubsubIOWrite<T>> {
 
     @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
     public void translate(
-        StreamingPubsubIOWrite transform,
-        TranslationContext context) {
-      translateTyped(transform, context);
-    }
-
-    private <T> void translateTyped(
         StreamingPubsubIOWrite<T> transform,
         TranslationContext context) {
       checkArgument(context.getPipelineOptions().isStreaming(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 6f4a18b..bb8daf3 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -240,8 +240,9 @@ public class IsmFormat {
     }
 
     /** Returns the key coder at the specified index. */
-    public Coder getKeyComponentCoder(int index) {
-      return keyComponentCoders.get(index);
+    @SuppressWarnings("unchecked")
+    public <T> Coder<T> getKeyComponentCoder(int index) {
+      return (Coder<T>) keyComponentCoders.get(index);
     }
 
     /** Returns the value coder. */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 36f154f..7617689 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -303,7 +303,7 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
     /**
      * Return current time according to {@code reader}.
      */
-    private static long now(PubsubReader reader) {
+    private static long now(PubsubReader<?> reader) {
       if (reader.outer.outer.clock == null) {
         return System.currentTimeMillis();
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 701ae70..3e810e9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -279,13 +279,14 @@ public interface PipelineOptions extends HasDisplayData {
    * as the {@link Default}. However, it should still be used if available, 
and a user is required
    * to explicitly set the {@code --runner} property if they wish to use an 
alternative runner.
    */
-  class DirectRunner implements DefaultValueFactory<Class<? extends 
PipelineRunner>> {
+  class DirectRunner implements DefaultValueFactory<Class<? extends 
PipelineRunner<?>>> {
     @Override
-    public Class<? extends PipelineRunner> create(PipelineOptions options) {
+    public Class<? extends PipelineRunner<?>> create(PipelineOptions options) {
       try {
         @SuppressWarnings({"unchecked", "rawtypes"})
-        Class<? extends PipelineRunner> direct = (Class<? extends 
PipelineRunner>) Class.forName(
-            "org.apache.beam.runners.direct.DirectRunner");
+        Class<? extends PipelineRunner<?>> direct =
+            (Class<? extends PipelineRunner<?>>)
+                Class.forName("org.apache.beam.runners.direct.DirectRunner");
         return direct;
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException(String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index e2730ed..509bb24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -105,7 +105,7 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
     @SafeVarargs
     public final Builder<T> addElements(T element, T... elements) {
       TimestampedValue<T> firstElement = TimestampedValue.of(element, 
currentWatermark);
-      @SuppressWarnings("unchecked")
+      @SuppressWarnings({"unchecked", "rawtypes"})
       TimestampedValue<T>[] remainingElements = new 
TimestampedValue[elements.length];
       for (int i = 0; i < elements.length; i++) {
         remainingElements[i] = TimestampedValue.of(elements[i], 
currentWatermark);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 7b259aa..3eee74a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -58,8 +58,9 @@ public class DoFnAdapters {
   }
 
   /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
   public static <InputT, OutputT> OldDoFn<InputT, OutputT> 
toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = 
DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    DoFnSignature signature = 
DoFnSignatures.INSTANCE.getOrParseSignature((Class) fn.getClass());
     if (signature.processElement().usesSingleWindow()) {
       return new WindowDoFnAdapter<>(fn);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 041eb60..da88587 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -164,9 +164,11 @@ public class DoFnInvokers {
   }
 
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
   public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
       DoFn<InputT, OutputT> fn) {
-    return 
newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()), 
fn);
+    return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(
+        (Class) fn.getClass()), fn);
   }
 
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
@@ -198,7 +200,7 @@ public class DoFnInvokers {
    */
   private synchronized Constructor<?> getOrGenerateByteBuddyInvokerConstructor(
       DoFnSignature signature) {
-    Class<? extends DoFn> fnClass = signature.fnClass();
+    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
     Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
     if (constructor == null) {
       Class<? extends DoFnInvoker<?, ?>> invokerClass = 
generateInvokerClass(signature);
@@ -214,7 +216,7 @@ public class DoFnInvokers {
 
   /** Generates a {@link DoFnInvoker} class for the given {@link 
DoFnSignature}. */
   private static Class<? extends DoFnInvoker<?, ?>> 
generateInvokerClass(DoFnSignature signature) {
-    Class<? extends DoFn> fnClass = signature.fnClass();
+    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
 
     final TypeDescription clazzDescription = new 
TypeDescription.ForLoadedType(fnClass);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index b6864da..756df07 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 @AutoValue
 public abstract class DoFnSignature {
   /** Class of the original {@link DoFn} from which this signature was 
produced. */
-  public abstract Class<? extends DoFn> fnClass();
+  public abstract Class<? extends DoFn<?, ?>> fnClass();
 
   /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
   public abstract ProcessElementMethod processElement();
@@ -60,7 +60,7 @@ public abstract class DoFnSignature {
 
   @AutoValue.Builder
   abstract static class Builder {
-    abstract Builder setFnClass(Class<? extends DoFn> fnClass);
+    abstract Builder setFnClass(Class<? extends DoFn<?, ?>> fnClass);
     abstract Builder setProcessElement(ProcessElementMethod processElement);
     abstract Builder setStartBundle(BundleMethod startBundle);
     abstract Builder setFinishBundle(BundleMethod finishBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 8283788..ad15127 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -49,8 +49,7 @@ public class DoFnSignatures {
   private final Map<Class<?>, DoFnSignature> signatureCache = new 
LinkedHashMap<>();
 
   /** @return the {@link DoFnSignature} for the given {@link DoFn}. */
-  public synchronized DoFnSignature getOrParseSignature(
-      @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
+  public synchronized <FnT extends DoFn<?, ?>> DoFnSignature 
getOrParseSignature(Class<FnT> fn) {
     DoFnSignature signature = signatureCache.get(fn);
     if (signature == null) {
       signatureCache.put(fn, signature = parseSignature(fn));
@@ -59,14 +58,14 @@ public class DoFnSignatures {
   }
 
   /** Analyzes a given {@link DoFn} class and extracts its {@link 
DoFnSignature}. */
-  private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
+  private static DoFnSignature parseSignature(Class<? extends DoFn<?, ?>> 
fnClass) {
     DoFnSignature.Builder builder = DoFnSignature.builder();
 
     ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
     errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be 
subtype of DoFn");
     builder.setFnClass(fnClass);
 
-    TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+    TypeToken<? extends DoFn<?, ?>> fnToken = TypeToken.of(fnClass);
 
     // Extract the input and output type, and whether the fn is bounded.
     TypeToken<?> inputT = null;
@@ -163,7 +162,7 @@ public class DoFnSignatures {
   @VisibleForTesting
   static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
       ErrorReporter errors,
-      TypeToken<? extends DoFn> fnClass,
+      TypeToken<? extends DoFn<?, ?>> fnClass,
       Method m,
       TypeToken<?> inputT,
       TypeToken<?> outputT) {
@@ -228,7 +227,7 @@ public class DoFnSignatures {
   @VisibleForTesting
   static DoFnSignature.BundleMethod analyzeBundleMethod(
       ErrorReporter errors,
-      TypeToken<? extends DoFn> fnToken,
+      TypeToken<? extends DoFn<?, ?>> fnToken,
       Method m,
       TypeToken<?> inputT,
       TypeToken<?> outputT) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index d690a47..530d755 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -395,6 +395,7 @@ public class CoderRegistryTest {
   private static class TestGenericClass<TestGenericT> { }
 
   @Test
+  @SuppressWarnings("rawtypes")
   public void testSerializableTypeVariableDefaultCoder() throws Exception {
     CoderRegistry registry = new CoderRegistry();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index f26667f..a9ec7e4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -58,7 +58,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class PipelineOptionsFactoryTest {
   private static final String DEFAULT_RUNNER_NAME = "DirectRunner";
-  private static final Class<? extends PipelineRunner> REGISTERED_RUNNER =
+  private static final Class<? extends PipelineRunner<?>> REGISTERED_RUNNER =
       RegisteredTestRunner.class;
 
   @Rule public ExpectedException expectedException = ExpectedException.none();
@@ -753,7 +753,9 @@ public class PipelineOptionsFactoryTest {
     void setString(List<String> value);
     List<Integer> getInteger();
     void setInteger(List<Integer> value);
+    @SuppressWarnings("rawtypes")
     List getList();
+    @SuppressWarnings("rawtypes")
     void setList(List value);
   }
 
@@ -1242,7 +1244,7 @@ public class PipelineOptionsFactoryTest {
   }
 
   private static class RegisteredTestRunner extends 
PipelineRunner<PipelineResult> {
-    public static PipelineRunner fromOptions(PipelineOptions options) {
+    public static PipelineRunner<PipelineResult> fromOptions(PipelineOptions 
options) {
       return new RegisteredTestRunner();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index ed65f15..03563f3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -124,8 +124,8 @@ public class TestPipelineTest {
   @Test
   public void testMatcherSerializationDeserialization() {
     TestPipelineOptions opts = 
PipelineOptionsFactory.as(TestPipelineOptions.class);
-    SerializableMatcher m1 = new TestMatcher();
-    SerializableMatcher m2 = new TestMatcher();
+    SerializableMatcher<PipelineResult> m1 = new TestMatcher();
+    SerializableMatcher<PipelineResult> m2 = new TestMatcher();
 
     opts.setOnCreateMatcher(m1);
     opts.setOnSuccessMatcher(m2);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index be061af..5ce8055 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -38,6 +38,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -95,34 +96,27 @@ public class CombineTest implements Serializable {
   // This test is Serializable, just so that it's easy to have
   // anonymous inner classes inside the non-static test methods.
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  static final KV<String, Integer>[] TABLE = new KV[] {
+  static final List<KV<String, Integer>> TABLE = Arrays.asList(
     KV.of("a", 1),
     KV.of("a", 1),
     KV.of("a", 4),
     KV.of("b", 1),
-    KV.of("b", 13),
-  };
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  static final KV<String, Integer>[] EMPTY_TABLE = new KV[] {
-  };
+    KV.of("b", 13)
+  );
 
-  static final Integer[] NUMBERS = new Integer[] {
-    1, 1, 2, 3, 5, 8, 13, 21, 34, 55
-  };
+  static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
 
   @Mock private DoFn<?, ?>.ProcessContext processContext;
 
   PCollection<KV<String, Integer>> createInput(Pipeline p,
-                                               KV<String, Integer>[] table) {
-    return p.apply(Create.of(Arrays.asList(table)).withCoder(
+                                               List<KV<String, Integer>> 
table) {
+    return p.apply(Create.of(table).withCoder(
         KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
   }
 
-  private void runTestSimpleCombine(KV<String, Integer>[] table,
+  private void runTestSimpleCombine(List<KV<String, Integer>> table,
                                     int globalSum,
-                                    KV<String, String>[] perKeyCombines) {
+                                    List<KV<String, String>> perKeyCombines) {
     Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
@@ -140,9 +134,9 @@ public class CombineTest implements Serializable {
     pipeline.run();
   }
 
-  private void runTestSimpleCombineWithContext(KV<String, Integer>[] table,
+  private void runTestSimpleCombineWithContext(List<KV<String, Integer>> table,
                                                int globalSum,
-                                               KV<String, String>[] 
perKeyCombines,
+                                               List<KV<String, String>> 
perKeyCombines,
                                                String[] globallyCombines) {
     Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, 
table);
@@ -174,8 +168,7 @@ public class CombineTest implements Serializable {
   @Category(RunnableOnService.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombine() {
-    runTestSimpleCombine(TABLE, 20, new KV[] {
-        KV.of("a", "114a"), KV.of("b", "113b") });
+    runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), 
KV.of("b", "113b")));
   }
 
   @Test
@@ -183,28 +176,27 @@ public class CombineTest implements Serializable {
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContext() {
     runTestSimpleCombineWithContext(TABLE, 20,
-        new KV[] {KV.of("a", "01124a"), KV.of("b", "01123b") },
+        Arrays.asList(KV.of("a", "01124a"), KV.of("b", "01123b")),
         new String[] {"01111234G"});
   }
 
   @Test
   @Category(RunnableOnService.class)
-  @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContextEmpty() {
-    runTestSimpleCombineWithContext(EMPTY_TABLE, 0, new KV[] {}, new String[] 
{});
+    runTestSimpleCombineWithContext(
+        EMPTY_TABLE, 0, Collections.<KV<String, String>>emptyList(), new 
String[] {});
   }
 
   @Test
   @Category(RunnableOnService.class)
-  @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineEmpty() {
-    runTestSimpleCombine(EMPTY_TABLE, 0, new KV[] { });
+    runTestSimpleCombine(EMPTY_TABLE, 0, Collections.<KV<String, 
String>>emptyList());
   }
 
   @SuppressWarnings("unchecked")
-  private void runTestBasicCombine(KV<String, Integer>[] table,
+  private void runTestBasicCombine(List<KV<String, Integer>> table,
                                    Set<Integer> globalUnique,
-                                   KV<String, Set<Integer>>[] perKeyUnique) {
+                                   List<KV<String, Set<Integer>>> 
perKeyUnique) {
     Pipeline pipeline = TestPipeline.create();
     pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
@@ -225,23 +217,22 @@ public class CombineTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  @SuppressWarnings({"rawtypes", "unchecked"})
   public void testBasicCombine() {
-    runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), new KV[] {
+    runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList(
         KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)),
-        KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13)) });
+        KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13))));
   }
 
   @Test
   @Category(RunnableOnService.class)
-  @SuppressWarnings("rawtypes")
   public void testBasicCombineEmpty() {
-    runTestBasicCombine(EMPTY_TABLE, ImmutableSet.<Integer>of(), new KV[] { });
+    runTestBasicCombine(
+        EMPTY_TABLE, ImmutableSet.<Integer>of(), Collections.<KV<String, 
Set<Integer>>>emptyList());
   }
 
-  private void runTestAccumulatingCombine(KV<String, Integer>[] table,
+  private void runTestAccumulatingCombine(List<KV<String, Integer>> table,
                                           Double globalMean,
-                                          KV<String, Double>[] perKeyMeans) {
+                                          List<KV<String, Double>> 
perKeyMeans) {
     Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
@@ -265,8 +256,7 @@ public class CombineTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
-        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
-                                   Arrays.asList(0L, 1L, 6L, 7L, 8L))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
 
@@ -292,8 +282,7 @@ public class CombineTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
-                                   Arrays.asList(0L, 1L, 6L, 7L, 8L))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
 
@@ -330,8 +319,7 @@ public class CombineTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
-                                   Arrays.asList(2L, 3L, 8L, 9L, 10L))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 
10L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, 
Integer>>into(SlidingWindows.of(Duration.millis(2))));
 
@@ -407,8 +395,7 @@ public class CombineTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
-        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
-                                   Arrays.asList(0L, 4L, 7L, 10L, 16L))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, 
Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
 
@@ -433,8 +420,7 @@ public class CombineTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
-                                   Arrays.asList(0L, 4L, 7L, 10L, 16L))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
 
     PCollection<Integer> globallyInput = 
perKeyInput.apply(Values.<Integer>create());
@@ -488,14 +474,13 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testAccumulatingCombine() {
-    runTestAccumulatingCombine(TABLE, 4.0, new KV[] {
-        KV.of("a", 2.0), KV.of("b", 7.0) });
+    runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), 
KV.of("b", 7.0)));
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testAccumulatingCombineEmpty() {
-    runTestAccumulatingCombine(EMPTY_TABLE, 0.0, new KV[] { });
+    runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.<KV<String, 
Double>>emptyList());
   }
 
   // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 0a4b3cd..7ce98bc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1461,8 +1461,8 @@ public class ParDoTest implements Serializable {
       }
     };
 
-    ParDo.BoundMulti parDo = ParDo
-            .withOutputTags(new TupleTag(), TupleTagList.empty())
+    ParDo.BoundMulti<String, String> parDo = ParDo
+            .withOutputTags(new TupleTag<String>(), TupleTagList.empty())
             .of(fn);
 
     DisplayData displayData = DisplayData.from(parDo);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
index db9aea3..b758ed6 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
@@ -117,7 +117,7 @@ public class DisplayDataEvaluator {
     return displayDataForPipeline(pipeline, root);
   }
 
-  private static Set<DisplayData> displayDataForPipeline(Pipeline pipeline, 
PTransform root) {
+  private static Set<DisplayData> displayDataForPipeline(Pipeline pipeline, 
PTransform<?, ?> root) {
     PrimitiveDisplayDataPTransformVisitor visitor = new 
PrimitiveDisplayDataPTransformVisitor(root);
     pipeline.traverseTopologically(visitor);
     return visitor.getPrimitivesDisplayData();
@@ -129,11 +129,11 @@ public class DisplayDataEvaluator {
    */
   private static class PrimitiveDisplayDataPTransformVisitor
   extends Pipeline.PipelineVisitor.Defaults {
-    private final PTransform root;
+    private final PTransform<?, ?> root;
     private final Set<DisplayData> displayData;
     private boolean inCompositeRoot = false;
 
-    PrimitiveDisplayDataPTransformVisitor(PTransform root) {
+    PrimitiveDisplayDataPTransformVisitor(PTransform<?, ?> root) {
       this.root = root;
       this.displayData = Sets.newHashSet();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 97d810c..d057765 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -42,7 +42,7 @@ import org.mockito.MockitoAnnotations;
 public class DoFnInvokersTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  @Mock private DoFn.ProcessContext mockContext;
+  @Mock private DoFn<String, String>.ProcessContext mockContext;
   @Mock private BoundedWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
index 2629995..92fe5a6 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
@@ -125,7 +125,7 @@ public class AvroHDFSFileSource<T> extends 
HDFSFileSource<AvroKey<T>, NullWritab
 
       // clone the record to work around identical element issue due to object 
reuse
       Coder<T> avroCoder = ((AvroHDFSFileSource<T>) 
this.getCurrentSource()).avroCoder;
-      key = new AvroKey(CoderUtils.clone(avroCoder, key.datum()));
+      key = new AvroKey<>(CoderUtils.clone(avroCoder, key.datum()));
 
       return KV.of(key, value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
index c1340c0..45a8037 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -64,7 +64,7 @@ public class AvroWrapperCoder<WrapperT extends 
AvroWrapper<DatumT>, DatumT>
   }
 
   @JsonCreator
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({"unchecked", "rawtypes"})
   public static AvroWrapperCoder<?, ?> of(
       @JsonProperty("wrapperType") String wrapperType,
       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> 
components)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 3a4d01f..c71a58c 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -120,10 +120,7 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
    */
   public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> 
from(
       String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> 
valueClass) {
-    @SuppressWarnings("unchecked")
-    HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
-        new HDFSFileSource(filepattern, formatClass, keyClass, valueClass);
-    return source;
+    return new HDFSFileSource<>(filepattern, formatClass, keyClass, 
valueClass);
   }
 
   /**
@@ -271,7 +268,7 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
 
     private final BoundedSource<KV<K, V>> source;
     private final String filepattern;
-    private final Class formatClass;
+    private final Class<? extends FileInputFormat<?, ?>> formatClass;
     protected Job job;
 
     private FileInputFormat<?, ?> format;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
index 6fb340e..d2cab57 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
@@ -93,10 +93,7 @@ public class SimpleAuthHDFSFileSource<K, V> extends 
HDFSFileSource<K, V> {
       Class<K> keyClass,
       Class<V> valueClass,
       String username) {
-    @SuppressWarnings("unchecked")
-    HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
-        new SimpleAuthHDFSFileSource(filepattern, formatClass, keyClass, 
valueClass, username);
-    return source;
+    return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass, 
valueClass, username);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 3107aab..1c35f6e 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -255,7 +255,7 @@ public class JmsIO {
     }
 
     @Override
-    public Coder getCheckpointMarkCoder() {
+    public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
       return AvroCoder.of(JmsCheckpointMark.class);
     }
 
@@ -319,6 +319,7 @@ public class JmsIO {
         }
 
         Map<String, Object> properties = new HashMap<>();
+        @SuppressWarnings("rawtypes")
         Enumeration propertyNames = message.getPropertyNames();
         while (propertyNames.hasMoreElements()) {
           String propertyName = (String) propertyNames.nextElement();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 804d6cc..4515f38 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -23,8 +23,9 @@ import java.util.NoSuchElementException;
  * Similar to Guava {@code Optional}, but throws {@link 
NoSuchElementException} for missing element.
  */
 abstract class CustomOptional<T> {
+    @SuppressWarnings("unchecked")
     public static <T> CustomOptional<T> absent() {
-        return Absent.INSTANCE;
+        return (Absent<T>) Absent.INSTANCE;
     }
 
     public static <T> CustomOptional<T> of(T v) {
@@ -67,7 +68,7 @@ abstract class CustomOptional<T> {
     }
 
     private static class Absent<T> extends CustomOptional<T> {
-        public static final Absent INSTANCE = new Absent();
+        private static final Absent<Object> INSTANCE = new Absent<>();
 
         private Absent() {
         }

Reply via email to