[ 
https://issues.apache.org/jira/browse/BEAM-4311?focusedWorklogId=110077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110077
 ]

ASF GitHub Bot logged work on BEAM-4311:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/18 11:21
            Start Date: 08/Jun/18 11:21
    Worklog Time Spent: 10m 
      Work Description: aljoscha closed pull request #5572: [BEAM-4311] Enforce 
ErrorProne analysis in Flink runner
URL: https://github.com/apache/beam/pull/5572
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index 17d55e8856e..7dbe5acb774 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -19,7 +19,8 @@
 import groovy.json.JsonOutput
 
 apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
+
 
 description = "Apache Beam :: Runners :: Flink"
 
@@ -48,6 +49,7 @@ def flink_version = "1.4.0"
 
 dependencies {
   compile library.java.guava
+  compileOnly library.java.findbugs_annotations
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index a06cea52909..5d3601da0b1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.base.Splitter;
 import java.util.List;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -51,12 +52,12 @@ public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipeline
     } else if ("[auto]".equals(masterUrl)) {
       flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
+      List<String> parts = Splitter.on(':').splitToList(masterUrl);
       List<String> stagingFiles = options.getFilesToStage();
       flinkBatchEnv =
           ExecutionEnvironment.createRemoteEnvironment(
-              parts[0],
-              Integer.parseInt(parts[1]),
+              parts.get(0),
+              Integer.parseInt(parts.get(1)),
               stagingFiles.toArray(new String[stagingFiles.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
@@ -99,12 +100,12 @@ public static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
     } else if ("[auto]".equals(masterUrl)) {
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
+      List<String> parts = Splitter.on(':').splitToList(masterUrl);
       List<String> stagingFiles = options.getFilesToStage();
       flinkStreamEnv =
           StreamExecutionEnvironment.createRemoteEnvironment(
-              parts[0],
-              Integer.parseInt(parts[1]),
+              parts.get(0),
+              Integer.parseInt(parts.get(1)),
               stagingFiles.toArray(new String[stagingFiles.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 7a55bc51974..5d9e6b93ede 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -401,18 +401,18 @@ public void close() throws Exception {
       }
     } finally {
       super.close();
+    }
 
-      // sanity check: these should have been flushed out by +Inf watermarks
-      if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
-        BagState<WindowedValue<InputT>> pushedBack =
-            nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+    // sanity check: these should have been flushed out by +Inf watermarks
+    if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
+      BagState<WindowedValue<InputT>> pushedBack =
+          nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
 
-        Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-        if (!Iterables.isEmpty(pushedBackContents)) {
-          String pushedBackString = Joiner.on(",").join(pushedBackContents);
-          throw new RuntimeException(
-              "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
-        }
+      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+      if (!Iterables.isEmpty(pushedBackContents)) {
+        String pushedBackString = Joiner.on(",").join(pushedBackContents);
+        throw new RuntimeException(
+            "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
       }
     }
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
index b8b40fe0616..6df1101c31c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
@@ -122,7 +122,10 @@ public void snapshotKeyGroupState(int keyGroupIndex, 
DataOutputStream out) throw
     Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
     VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
     for (ByteBuffer id : ids) {
-      ByteArrayCoder.of().encode(id.array(), out, Context.NESTED);
+      byte[] bytes = new byte[id.remaining()];
+      id.get(bytes);
+      id.position(id.position() - bytes.length);
+      ByteArrayCoder.of().encode(bytes, out, Context.NESTED);
     }
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 03681725ca9..6d75a92e1a1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -25,6 +25,7 @@
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -152,7 +153,9 @@ private void openConnection() throws IOException {
       this.socket = new Socket();
       this.socket.connect(new InetSocketAddress(this.source.getHostname(), 
this.source.getPort()),
           CONNECTION_TIMEOUT_TIME);
-      this.reader = new BufferedReader(new 
InputStreamReader(this.socket.getInputStream()));
+      this.reader =
+          new BufferedReader(
+              new InputStreamReader(this.socket.getInputStream(), 
StandardCharsets.UTF_8));
       this.isRunning = true;
     }
 
@@ -175,7 +178,7 @@ public boolean start() throws IOException {
             try {
               Thread.sleep(this.source.getDelayBetweenRetries());
             } catch (InterruptedException e1) {
-              e1.printStackTrace();
+              LOG.error("Interrupted during retry delay", e1);
             }
           } else {
             this.isRunning = false;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 3f04b6c8377..fc349f392f2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -436,7 +436,7 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
   }
 
   @Override
-  public void onProcessingTime(long timestamp) throws Exception {
+  public void onProcessingTime(long timestamp) {
     if (this.isRunning) {
       synchronized (context.getCheckpointLock()) {
         // find minimum watermark over all localReaders
@@ -457,9 +457,11 @@ public void onProcessingTime(long timestamp) throws 
Exception {
     }
   }
 
+  // the callback is ourselves so there is nothing meaningful we can do with 
the ScheduledFuture
+  @SuppressWarnings("FutureReturnValueIgnored")
   private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
     if (this.isRunning) {
-      long watermarkInterval =  
runtime.getExecutionConfig().getAutoWatermarkInterval();
+      long watermarkInterval = 
runtime.getExecutionConfig().getAutoWatermarkInterval();
       long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
       runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, 
this);
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 3288be1c5df..d7a42e1a8bc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -92,22 +92,22 @@ public K getKey() {
         new StateTag.StateBinder() {
 
           @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<ValueState<T>> address, Coder<T> coder) {
+          public <T2> ValueState<T2> bindValue(
+              StateTag<ValueState<T2>> address, Coder<T2> coder) {
 
             return new FlinkBroadcastValueState<>(stateBackend, address, 
namespace, coder);
           }
 
           @Override
-          public <T> BagState<T> bindBag(
-              StateTag<BagState<T>> address, Coder<T> elemCoder) {
+          public <T2> BagState<T2> bindBag(
+              StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
 
             return new FlinkBroadcastBagState<>(stateBackend, address, 
namespace, elemCoder);
           }
 
           @Override
-          public <T> SetState<T> bindSet(
-              StateTag<SetState<T>> address, Coder<T> elemCoder) {
+          public <T2> SetState<T2> bindSet(
+              StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", 
SetState.class.getSimpleName()));
           }
@@ -289,7 +289,7 @@ void clearInternal() {
 
   }
 
-  private class FlinkBroadcastValueState<K, T>
+  private class FlinkBroadcastValueState<T>
       extends AbstractBroadcastState<T> implements ValueState<T> {
 
     private final StateNamespace namespace;
@@ -331,7 +331,7 @@ public boolean equals(Object o) {
         return false;
       }
 
-      FlinkBroadcastValueState<?, ?> that = (FlinkBroadcastValueState<?, ?>) o;
+      FlinkBroadcastValueState<?> that = (FlinkBroadcastValueState<?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -350,7 +350,7 @@ public void clear() {
     }
   }
 
-  private class FlinkBroadcastBagState<K, T> extends 
AbstractBroadcastState<List<T>>
+  private class FlinkBroadcastBagState<T> extends 
AbstractBroadcastState<List<T>>
       implements BagState<T> {
 
     private final StateNamespace namespace;
@@ -423,7 +423,7 @@ public boolean equals(Object o) {
         return false;
       }
 
-      FlinkBroadcastBagState<?, ?> that = (FlinkBroadcastBagState<?, ?>) o;
+      FlinkBroadcastBagState<?> that = (FlinkBroadcastBagState<?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -437,7 +437,7 @@ public int hashCode() {
     }
   }
 
-  private class FlinkCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkCombiningState<InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
@@ -539,8 +539,8 @@ public boolean equals(Object o) {
         return false;
       }
 
-      FlinkCombiningState<?, ?, ?, ?> that =
-          (FlinkCombiningState<?, ?, ?, ?>) o;
+      FlinkCombiningState<?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -554,14 +554,14 @@ public int hashCode() {
     }
   }
 
-  private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
     private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
+    private final FlinkBroadcastStateInternals<K2> flinkStateInternals;
 
     FlinkKeyedCombiningState(
         OperatorStateBackend flinkStateBackend,
@@ -569,7 +569,7 @@ public int hashCode() {
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K> flinkStateInternals) {
+        FlinkBroadcastStateInternals<K2> flinkStateInternals) {
       super(flinkStateBackend, address.getId(), namespace, accumCoder);
 
       this.namespace = namespace;
@@ -690,14 +690,14 @@ public int hashCode() {
     }
   }
 
-  private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
+  private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
     private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, 
OutputT> combineFn;
-    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
+    private final FlinkBroadcastStateInternals<K2> flinkStateInternals;
     private final CombineWithContext.Context context;
 
     FlinkCombiningStateWithContext(
@@ -706,7 +706,7 @@ public int hashCode() {
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K> flinkStateInternals,
+        FlinkBroadcastStateInternals<K2> flinkStateInternals,
         CombineWithContext.Context context) {
       super(flinkStateBackend, address.getId(), namespace, accumCoder);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index d1907716ddc..ae5881e6a35 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -97,7 +97,10 @@ public FlinkKeyGroupStateInternals(
   public K getKey() {
     ByteBuffer keyBytes = (ByteBuffer) keyedStateBackend.getCurrentKey();
     try {
-      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+      byte[] bytes = new byte[keyBytes.remaining()];
+      keyBytes.get(bytes);
+      keyBytes.position(keyBytes.position() - bytes.length);
+      return CoderUtils.decodeFromByteArray(keyCoder, bytes);
     } catch (CoderException e) {
       throw new RuntimeException("Error decoding key.", e);
     }
@@ -113,22 +116,22 @@ public K getKey() {
         new StateTag.StateBinder() {
 
           @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<ValueState<T>> address, Coder<T> coder) {
+          public <T2> ValueState<T2> bindValue(
+              StateTag<ValueState<T2>> address, Coder<T2> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", 
ValueState.class.getSimpleName()));
           }
 
           @Override
-          public <T> BagState<T> bindBag(
-              StateTag<BagState<T>> address, Coder<T> elemCoder) {
+          public <T2> BagState<T2> bindBag(
+              StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
 
             return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
           }
 
           @Override
-          public <T> SetState<T> bindSet(
-              StateTag<SetState<T>> address, Coder<T> elemCoder) {
+          public <T2> SetState<T2> bindSet(
+              StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", 
SetState.class.getSimpleName()));
           }
@@ -295,7 +298,7 @@ private int getIndexForKeyGroup(int keyGroupIdx) {
     return keyGroupIdx - this.localKeyGroupRangeStartIdx;
   }
 
-  private class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, 
Iterable<T>> {
+  private static class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, 
List<T>, Iterable<T>> {
 
     @Override
     public List<T> createAccumulator() {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 3374ba25cb2..974f4ba1380 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -76,22 +76,22 @@ public K getKey() {
         new StateTag.StateBinder() {
 
           @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<ValueState<T>> address, Coder<T> coder) {
+          public <T2> ValueState<T2> bindValue(
+              StateTag<ValueState<T2>> address, Coder<T2> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", 
ValueState.class.getSimpleName()));
           }
 
           @Override
-          public <T> BagState<T> bindBag(
-              StateTag<BagState<T>> address, Coder<T> elemCoder) {
+          public <T2> BagState<T2> bindBag(
+              StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
 
             return new FlinkSplitBagState<>(stateBackend, address, namespace, 
elemCoder);
           }
 
           @Override
-          public <T> SetState<T> bindSet(
-              StateTag<SetState<T>> address, Coder<T> elemCoder) {
+          public <T2> SetState<T2> bindSet(
+              StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", 
SetState.class.getSimpleName()));
           }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 6c35f5372f5..43e2c90d65d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -89,8 +89,11 @@ public Instant watermarkHold() {
   @Override
   public K getKey() {
     ByteBuffer keyBytes = flinkStateBackend.getCurrentKey();
+    byte[] bytes = new byte[keyBytes.remaining()];
+    keyBytes.get(bytes);
+    keyBytes.position(keyBytes.position() - bytes.length);
     try {
-      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+      return CoderUtils.decodeFromByteArray(keyCoder, bytes);
     } catch (CoderException e) {
       throw new RuntimeException("Error decoding key.", e);
     }
@@ -106,22 +109,22 @@ public K getKey() {
         new StateTag.StateBinder() {
 
           @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<ValueState<T>> address, Coder<T> coder) {
+          public <T2> ValueState<T2> bindValue(
+              StateTag<ValueState<T2>> address, Coder<T2> coder) {
 
             return new FlinkValueState<>(flinkStateBackend, address, 
namespace, coder);
           }
 
           @Override
-          public <T> BagState<T> bindBag(
-              StateTag<BagState<T>> address, Coder<T> elemCoder) {
+          public <T2> BagState<T2> bindBag(
+              StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
 
             return new FlinkBagState<>(flinkStateBackend, address, namespace, 
elemCoder);
           }
 
           @Override
-          public <T> SetState<T> bindSet(
-              StateTag<SetState<T>> address, Coder<T> elemCoder) {
+          public <T2> SetState<T2> bindSet(
+              StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
             return new FlinkSetState<>(
                 flinkStateBackend, address, namespace, elemCoder);
           }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
index 73be0ef09ce..15d76dfa3c7 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -22,6 +22,7 @@
 import static org.junit.Assert.assertThat;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
@@ -52,14 +53,20 @@ public void testDeduping() throws Exception {
     String key1 = "key1";
     String key2 = "key2";
 
-    harness.processElement(new StreamRecord<>(
-        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, 
key1.getBytes()))));
+    harness.processElement(
+        new StreamRecord<>(
+            WindowedValue.valueInGlobalWindow(
+                new ValueWithRecordId<>(key1, 
key1.getBytes(StandardCharsets.UTF_8)))));
 
-    harness.processElement(new StreamRecord<>(
-        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, 
key2.getBytes()))));
+    harness.processElement(
+        new StreamRecord<>(
+            WindowedValue.valueInGlobalWindow(
+                new ValueWithRecordId<>(key2, 
key2.getBytes(StandardCharsets.UTF_8)))));
 
-    harness.processElement(new StreamRecord<>(
-        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, 
key1.getBytes()))));
+    harness.processElement(
+        new StreamRecord<>(
+            WindowedValue.valueInGlobalWindow(
+                new ValueWithRecordId<>(key1, 
key1.getBytes(StandardCharsets.UTF_8)))));
 
     assertThat(
         stripStreamRecordFromWindowedValue(harness.getOutput()),
@@ -77,11 +84,15 @@ public void testDeduping() throws Exception {
 
     String key3 = "key3";
 
-    harness.processElement(new StreamRecord<>(
-        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, 
key2.getBytes()))));
+    harness.processElement(
+        new StreamRecord<>(
+            WindowedValue.valueInGlobalWindow(
+                new ValueWithRecordId<>(key2, 
key2.getBytes(StandardCharsets.UTF_8)))));
 
-    harness.processElement(new StreamRecord<>(
-        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, 
key3.getBytes()))));
+    harness.processElement(
+        new StreamRecord<>(
+            WindowedValue.valueInGlobalWindow(
+                new ValueWithRecordId<>(key3, 
key3.getBytes(StandardCharsets.UTF_8)))));
 
     assertThat(
         stripStreamRecordFromWindowedValue(harness.getOutput()),
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 48078117b65..d9c008e5a11 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -82,6 +82,8 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  // suppress since toString() of Void is called and key is deliberately null
+  @SuppressWarnings("ObjectToString")
   @Override
   protected void testProgram() throws Exception {
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index df6c4d179a9..1aa6c5ee8d5 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -114,7 +114,7 @@ public int getShardNumber() {
     return splits;
   }
 
-  class CounterMark implements UnboundedSource.CheckpointMark {
+  static class CounterMark implements UnboundedSource.CheckpointMark {
     int current;
 
     public CounterMark(int current) {
@@ -239,7 +239,8 @@ public CountingSourceReader createReader(
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
 
-  private class FromCounterMark implements 
DelegateCoder.CodingFunction<CounterMark, Integer> {
+  private static class FromCounterMark
+      implements DelegateCoder.CodingFunction<CounterMark, Integer> {
     @Override
     public Integer apply(CounterMark input) {
       return input.current;
@@ -256,7 +257,7 @@ public boolean equals(Object obj) {
     }
   }
 
-  private class ToCounterMark implements DelegateCoder.CodingFunction<Integer, 
CounterMark> {
+  private static class ToCounterMark implements 
DelegateCoder.CodingFunction<Integer, CounterMark> {
     @Override
     public CounterMark apply(Integer input) {
       return new CounterMark(input);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 110077)
    Time Spent: 40m  (was: 0.5h)

> Enforce ErrorProne analysis in Flink runner project
> ---------------------------------------------------
>
>                 Key: BEAM-4311
>                 URL: https://issues.apache.org/jira/browse/BEAM-4311
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Scott Wegner
>            Assignee: Tim Robertson
>            Priority: Minor
>              Labels: errorprone, starter
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-flink}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-flink:assemble}}
> # Fix each ErrorProne warning from the {{runners/flink}} project.
> # In {{runners/flink/build.gradle}}, add {{failOnWarning: true}} to the call 
> the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to