[ 
https://issues.apache.org/jira/browse/BEAM-4309?focusedWorklogId=112301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112301
 ]

ASF GitHub Bot logged work on BEAM-4309:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/18 11:59
            Start Date: 15/Jun/18 11:59
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5644: [BEAM-4309] Fix 
ErrorProne violations in direct runner
URL: https://github.com/apache/beam/pull/5644
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 59f8be9a383..c6e7f557bae 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -24,7 +24,7 @@ def dependOnProjects = [":beam-model-pipeline", 
":beam-runners-core-construction
                         ":beam-runners-java-fn-execution", 
":beam-sdks-java-fn-execution"]
 
 apply from: project(":").file("build_rules.gradle")
-applyJavaNature(shadowClosure: DEFAULT_SHADOW_CLOSURE << {
+applyJavaNature(failOnWarning: true, shadowClosure: DEFAULT_SHADOW_CLOSURE << {
   dependencies {
     include(dependency(library.java.protobuf_java))
     include(dependency(library.java.protobuf_java_util))
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 5522dac8627..a1da613df20 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -37,6 +37,7 @@
   /**
    * Returns the PCollection that the elements of this bundle belong to.
    */
+  @Override
   @Nullable
   PCollection<T> getPCollection();
 
@@ -44,6 +45,7 @@
    * Returns the key that was output in the most recent {@code GroupByKey} in 
the
    * execution of this bundle.
    */
+  @Override
   StructuralKey<?> getKey();
 
   /**
@@ -58,6 +60,7 @@
    * <p>This should be equivalent to iterating over all of the elements within 
a bundle and
    * selecting the minimum timestamp from among them.
    */
+  @Override
   Instant getMinimumTimestamp();
 
   /**
@@ -69,6 +72,7 @@
    * processing time {@link TimerData timer} at the time this bundle was 
committed, including any
    * timers that fired to produce this bundle.
    */
+  @Override
   Instant getSynchronizedProcessingOutputWatermark();
   /**
    * Return a new {@link CommittedBundle} that is like this one, except calls 
to
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 42d72ebcbac..e1da6e31257 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -239,9 +239,13 @@ public static MetricQueryResults create(
   abstract static class DirectMetricResult<T> implements MetricResult<T> {
     // need to define these here so they appear in the correct order
     // and the generated constructor is usable and consistent
+    @Override
     public abstract MetricName getName();
+    @Override
     public abstract String getStep();
+    @Override
     public abstract T getCommitted();
+    @Override
     public abstract T getAttempted();
 
     public static <T> MetricResult<T> create(MetricName name, String scope,
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2ccea0e66bd..28ab81c3ecc 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -142,6 +142,8 @@ public TransformExecutorService load(StepAndKey stepAndKey) 
throws Exception {
   }
 
   @Override
+  // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void start(DirectGraph graph, RootProviderRegistry 
rootProviderRegistry) {
     int numTargetSplits = Math.max(3, targetParallelism);
     ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 9aa71f742d8..3e5062c0400 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -69,6 +69,8 @@ private ParallelTransformExecutor(ExecutorService executor) {
     }
 
     @Override
+    // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+    @SuppressWarnings("FutureReturnValueIgnored")
     public void schedule(TransformExecutor work) {
       if (active.get()) {
         try {
@@ -152,6 +154,8 @@ public void shutdown() {
       workQueue.clear();
     }
 
+    // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+    @SuppressWarnings("FutureReturnValueIgnored")
     private void updateCurrentlyEvaluating() {
       if (currentlyEvaluating.get() == null) {
         // Only synchronize if we need to update what's currently evaluating
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 4640efd5dc1..c202fffef97 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -119,6 +119,8 @@ public UnboundedReadEvaluator(
     }
 
     @Override
+    @SuppressWarnings("Finally") // Cannot use try-with-resources in order to 
ensure we don't
+                                 // double-close the reader.
     public void processElement(
         WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) 
throws IOException {
       UncommittedBundle<OutputT> output =
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
index 94bede0a6f2..26c760b236b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
@@ -39,12 +39,14 @@
    * Returns the PCollection that the elements of this bundle belong to.
    */
   @Nullable
+  @Override
   PCollectionNode getPCollection();
 
   /**
    * Returns the key that was output in the most recent {@code GroupByKey} in 
the
    * execution of this bundle.
    */
+  @Override
   StructuralKey<?> getKey();
 
   /**
@@ -59,6 +61,7 @@
    * <p>This should be equivalent to iterating over all of the elements within 
a bundle and
    * selecting the minimum timestamp from among them.
    */
+  @Override
   Instant getMinimumTimestamp();
 
   /**
@@ -70,6 +73,7 @@
    * processing time {@link TimerData timer} at the time this bundle was 
committed, including any
    * timers that fired to produce this bundle.
    */
+  @Override
   Instant getSynchronizedProcessingOutputWatermark();
   /**
    * Return a new {@link CommittedBundle} that is like this one, except calls 
to
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
index cb217e26e39..f873ee47c2d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
@@ -110,9 +110,11 @@ public void updatePhysical(CommittedBundle<?> bundle, 
UpdateT tentativeCumulativ
      * @param bundle The bundle being committed.
      * @param finalCumulative The final cumulative value for the given bundle.
      */
+    @SuppressWarnings("FutureReturnValueIgnored") // direct runner metrics are 
best-effort;
+                                                  // we choose not to block on 
async commit
     public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT 
finalCumulative) {
       // To prevent a query from blocking the commit, we perform the commit in 
two steps.
-      // 1. We perform a non-blocking write to the uncommitted table to make 
the new vaule
+      // 1. We perform a non-blocking write to the uncommitted table to make 
the new value
       //    available immediately.
       // 2. We submit a runnable that will commit the update and remove the 
tentative value in
       //    a synchronized block.
@@ -247,9 +249,13 @@ public static MetricQueryResults create(
   abstract static class DirectMetricResult<T> implements MetricResult<T> {
     // need to define these here so they appear in the correct order
     // and the generated constructor is usable and consistent
+    @Override
     public abstract MetricName getName();
+    @Override
     public abstract String getStep();
+    @Override
     public abstract T getCommitted();
+    @Override
     public abstract T getAttempted();
 
     public static <T> MetricResult<T> create(MetricName name, String scope,
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 4b1d91714b2..e70f5054b04 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -142,6 +142,8 @@ public TransformExecutorService load(StepAndKey stepAndKey) 
throws Exception {
   }
 
   @Override
+  // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void start() {
     int numTargetSplits = Math.max(3, targetParallelism);
     ImmutableMap.Builder<PTransformNode, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
index 5214f75fa04..d03ef8930fa 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
@@ -69,6 +69,8 @@ private ParallelTransformExecutor(ExecutorService executor) {
     }
 
     @Override
+    // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+    @SuppressWarnings("FutureReturnValueIgnored")
     public void schedule(TransformExecutor work) {
       if (active.get()) {
         try {
@@ -152,6 +154,8 @@ public void shutdown() {
       workQueue.clear();
     }
 
+    // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+    @SuppressWarnings("FutureReturnValueIgnored")
     private void updateCurrentlyEvaluating() {
       if (currentlyEvaluating.get() == null) {
         // Only synchronize if we need to update what's currently evaluating
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index 4cb9722f76e..67297b4ec4b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -130,6 +130,7 @@ public void prepare(
   }
 
   @Override
+  @SuppressWarnings("FutureReturnValueIgnored") // Run API does not block on 
execution
   public void run(
       JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse> 
responseObserver) {
     try {
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3f2a4772a97..2f15e5e474b 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -270,9 +270,10 @@ public void cancelShouldStopPipeline() throws Exception {
         };
 
     ExecutorService executor = Executors.newCachedThreadPool();
-    executor.submit(cancelRunnable);
+    Future<?> cancelResult = executor.submit(cancelRunnable);
     Future<PipelineResult> result = executor.submit(runPipelineRunnable);
 
+    cancelResult.get();
     // If cancel doesn't work, this will hang forever
     result.get().waitUntilFinish();
   }
@@ -320,7 +321,7 @@ public void teardown() {
           try {
             Thread.sleep(1000);
           } catch (final InterruptedException e) {
-            fail();
+            throw new AssertionError(e);
           }
           TEARDOWN_CALL.set(System.nanoTime());
         }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
index 1760fb97b81..8fcd147c918 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
@@ -191,9 +191,10 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
             completionCallback,
             transformEvaluationState);
 
-    Executors.newSingleThreadExecutor().submit(executor);
+    Future<?> future = Executors.newSingleThreadExecutor().submit(executor);
 
     evaluatorCompleted.await();
+    future.get();
 
     assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
     assertThat(completionCallback.handledResult, equalTo(result));
@@ -201,6 +202,7 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
   }
 
   @Test
+  @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked 
via completionCallback
   public void processElementThrowsExceptionCallsback() throws Exception {
     final TransformResult<String> result =
         StepTransformResult.<String>withoutHold(downstreamProducer).build();
@@ -241,6 +243,7 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
   }
 
   @Test
+  @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked 
via completionCallback
   public void finishBundleThrowsExceptionCallsback() throws Exception {
     final Exception exception = new Exception();
     TransformEvaluator<String> evaluator =
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 73ec1d8320c..482ada175f0 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -262,7 +263,7 @@ public void 
getExecutionContextDifferentStepsIndependentState() {
 
   @Test
   public void handleResultStoresState() {
-    StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), 
ByteArrayCoder.of());
+    StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(UTF_8), 
ByteArrayCoder.of());
     DirectExecutionContext fooContext =
         context.getExecutionContext(downstreamProducer, myKey);
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
index 9a78a044fc0..153681f0d3e 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
@@ -23,6 +23,7 @@
 
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -56,7 +57,7 @@
 
   @Test
   @Ignore("https://issues.apache.org/jira/browse/BEAM-4088 Test reliably 
fails.")
-  public void ensureMetricsThreadDoesntLeak() {
+  public void ensureMetricsThreadDoesntLeak() throws ExecutionException, 
InterruptedException {
     final DirectGraph graph =
         DirectGraph.create(
             emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(), 
emptyMap());
@@ -68,7 +69,7 @@ public void ensureMetricsThreadDoesntLeak() {
                 .build());
 
     // fake a metrics usage
-    metricsExecutorService.submit(() -> {});
+    metricsExecutorService.submit(() -> {}).get();
 
     final EvaluationContext context =
         EvaluationContext.create(
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 0620fb4ac16..b4c8b87ab24 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -54,7 +56,7 @@ public void setup() {
     factory = new ImmutabilityEnforcementFactory();
     bundleFactory = ImmutableListBundleFactory.create();
     pcollection =
-        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+        p.apply(Create.of("foo".getBytes(UTF_8), 
"spamhameggs".getBytes(UTF_8)))
             .apply(
                 ParDo.of(
                     new DoFn<byte[], byte[]>() {
@@ -71,7 +73,7 @@ public void processElement(ProcessContext c)
 
   @Test
   public void unchangedSucceeds() {
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
     CommittedBundle<byte[]> elements =
         
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
@@ -86,7 +88,7 @@ public void unchangedSucceeds() {
 
   @Test
   public void mutatedDuringProcessElementThrows() {
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
     CommittedBundle<byte[]> elements =
         
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
@@ -107,7 +109,7 @@ public void mutatedDuringProcessElementThrows() {
   @Test
   public void mutatedAfterProcessElementFails() {
 
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
     CommittedBundle<byte[]> elements =
         
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 4236f06e64f..618b1fd79c1 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -27,6 +27,8 @@
 import static org.mockito.Mockito.doAnswer;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -461,7 +463,8 @@ private CountDownLatch invokeLatchedCallback(
             invocation -> {
               Object callback = invocation.getArguments()[3];
               final Runnable callbackRunnable = (Runnable) callback;
-              Executors.newSingleThreadExecutor()
+              ListenableFuture<?> result = MoreExecutors
+                  .listeningDecorator(Executors.newSingleThreadExecutor())
                   .submit(
                       () -> {
                         try {
@@ -471,9 +474,8 @@ private CountDownLatch invokeLatchedCallback(
                           callbackRunnable.run();
                           onComplete.countDown();
                         } catch (InterruptedException e) {
-                          fail(
-                              "Unexpectedly interrupted while waiting for 
latch "
-                                  + "to be counted down");
+                          throw new AssertionError(
+                              "Unexpectedly interrupted while waiting for 
latch ", e);
                         }
                       });
               return null;
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
index 0aa2c493a82..0e887ced3c8 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
@@ -22,8 +22,14 @@
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +63,7 @@ public void cachedIdDeduplicatorTrueForFirstIdThenFalse() {
   }
 
   @Test
-  public void cachedIdDeduplicatorMultithreaded() throws InterruptedException {
+  public void cachedIdDeduplicatorMultithreaded() throws InterruptedException, 
ExecutionException {
     byte[] id = new byte[] {-1, 2, 4, 22};
     UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create();
     final CountDownLatch startSignal = new CountDownLatch(1);
@@ -65,22 +71,25 @@ public void cachedIdDeduplicatorMultithreaded() throws 
InterruptedException {
     final CountDownLatch readyLatch = new CountDownLatch(numThreads);
     final CountDownLatch finishLine = new CountDownLatch(numThreads);
 
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+        Executors.newCachedThreadPool());
     AtomicInteger successCount = new AtomicInteger();
     AtomicInteger noOutputCount = new AtomicInteger();
+    List<ListenableFuture<?>> futures = new ArrayList<>();
     for (int i = 0; i < numThreads; i++) {
-      executor.submit(new TryOutputIdRunnable(dedupper,
+      futures.add(executor.submit(new TryOutputIdRunnable(dedupper,
           id,
           successCount,
           noOutputCount,
           readyLatch,
           startSignal,
-          finishLine));
+          finishLine)));
     }
 
     readyLatch.await();
     startSignal.countDown();
     finishLine.await(10L, TimeUnit.SECONDS);
+    Futures.allAsList(futures).get();
     executor.shutdownNow();
 
     // The first thread to run will succeed, and no others will
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 92136606415..a4113f9113e 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -27,11 +28,13 @@
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Splitter;
 import java.io.File;
-import java.io.FileReader;
 import java.io.Reader;
 import java.io.Serializable;
 import java.nio.CharBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -108,12 +111,12 @@ public void dynamicallyReshardedWrite() throws Exception {
       String filename = match.resourceId().toString();
       files.add(filename);
       CharBuffer buf = CharBuffer.allocate((int) new File(filename).length());
-      try (Reader reader = new FileReader(filename)) {
+      try (Reader reader = Files.newBufferedReader(Paths.get(filename), 
UTF_8)) {
         reader.read(buf);
         buf.flip();
       }
 
-      String[] readStrs = buf.toString().split("\n");
+      Iterable<String> readStrs = Splitter.on("\n").split(buf.toString());
       for (String read : readStrs) {
         if (read.length() > 0) {
           actuals.add(read);
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
index 4eb02657fce..50d2326e949 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
@@ -33,6 +33,7 @@
 import java.util.EnumSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
@@ -193,9 +194,10 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
             completionCallback,
             transformEvaluationState);
 
-    Executors.newSingleThreadExecutor().submit(executor);
+    Future<?> future = Executors.newSingleThreadExecutor().submit(executor);
 
     evaluatorCompleted.await();
+    future.get();
 
     assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
     assertThat(completionCallback.handledResult, Matchers.equalTo(result));
@@ -203,6 +205,7 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
   }
 
   @Test
+  @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked 
via completionCallback
   public void processElementThrowsExceptionCallsback() throws Exception {
     final TransformResult<String> result =
         StepTransformResult.<String>withoutHold(downstreamProducer).build();
@@ -242,6 +245,7 @@ public void processElement(WindowedValue<String> element) 
throws Exception {
   }
 
   @Test
+  @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked 
via completionCallback
   public void finishBundleThrowsExceptionCallsback() throws Exception {
     final Exception exception = new Exception();
     TransformEvaluator<String> evaluator =
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
index 6ce38b53f1d..d41292bf31f 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct.portable;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
@@ -149,7 +150,7 @@ public void 
getExecutionContextDifferentStepsIndependentState() {
 
   @Test
   public void handleResultStoresState() {
-    StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), 
ByteArrayCoder.of());
+    StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(UTF_8), 
ByteArrayCoder.of());
     StepStateAndTimers fooContext = 
context.getStateAndTimers(downstreamProducer, myKey);
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
index e49588b2776..ef285469902 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct.portable.artifact;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.not;
@@ -91,7 +92,7 @@ public void teardown() throws Exception {
   @Test
   public void retrieveManifest() throws Exception {
     Map<String, byte[]> artifacts = new HashMap<>();
-    artifacts.put("foo", "bar, baz, quux".getBytes());
+    artifacts.put("foo", "bar, baz, quux".getBytes(UTF_8));
     artifacts.put("spam", new byte[] {127, -22, 5});
     stageAndCreateRetrievalService(artifacts);
 
@@ -129,7 +130,7 @@ public void onCompleted() {
   @Test
   public void retrieveArtifact() throws Exception {
     Map<String, byte[]> artifacts = new HashMap<>();
-    byte[] fooContents = "bar, baz, quux".getBytes();
+    byte[] fooContents = "bar, baz, quux".getBytes(UTF_8);
     artifacts.put("foo", fooContents);
     byte[] spamContents = {127, -22, 5};
     artifacts.put("spam", spamContents);
@@ -152,7 +153,8 @@ public void retrieveArtifact() throws Exception {
 
   @Test
   public void retrieveArtifactNotPresent() throws Exception {
-    stageAndCreateRetrievalService(Collections.singletonMap("foo", "bar, baz, 
quux".getBytes()));
+    stageAndCreateRetrievalService(Collections.singletonMap(
+        "foo", "bar, baz, quux".getBytes(UTF_8)));
 
     final CountDownLatch completed = new CountDownLatch(1);
     final AtomicReference<Throwable> thrown = new AtomicReference<>();
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
index 84b4a073b7c..e43481e51f9 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct.portable.artifact;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -79,7 +80,7 @@ public void teardown() {
 
   @Test
   public void singleDataPutArtifactSucceeds() throws Exception {
-    byte[] data = "foo-bar-baz".getBytes();
+    byte[] data = "foo-bar-baz".getBytes(UTF_8);
     RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
         new RecordingStreamObserver<>();
     StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -112,9 +113,9 @@ public void singleDataPutArtifactSucceeds() throws 
Exception {
 
   @Test
   public void multiPartPutArtifactSucceeds() throws Exception {
-    byte[] partOne = "foo-".getBytes();
-    byte[] partTwo = "bar-".getBytes();
-    byte[] partThree = "baz".getBytes();
+    byte[] partOne = "foo-".getBytes(UTF_8);
+    byte[] partTwo = "bar-".getBytes(UTF_8);
+    byte[] partThree = "baz".getBytes(UTF_8);
     RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
         new RecordingStreamObserver<>();
     StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -158,12 +159,12 @@ public void multiPartPutArtifactSucceeds() throws 
Exception {
     assertThat(staged.exists(), is(true));
     ByteBuffer buf = ByteBuffer.allocate("foo-bar-baz".length());
     new FileInputStream(staged).getChannel().read(buf);
-    Assert.assertArrayEquals("foo-bar-baz".getBytes(), buf.array());
+    Assert.assertArrayEquals("foo-bar-baz".getBytes(UTF_8), buf.array());
   }
 
   @Test
   public void putArtifactBeforeNameFails() {
-    byte[] data = "foo-".getBytes();
+    byte[] data = "foo-".getBytes(UTF_8);
     RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
         new RecordingStreamObserver<>();
     StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -200,9 +201,9 @@ public void putArtifactWithNoContentFails() {
   @Test
   public void commitManifestWithAllArtifactsSucceeds() {
     ArtifactApi.ArtifactMetadata firstArtifact =
-        stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
+        stageBytes("first-artifact", "foo, bar, baz, quux".getBytes(UTF_8));
     ArtifactApi.ArtifactMetadata secondArtifact =
-        stageBytes("second-artifact", "spam, ham, eggs".getBytes());
+        stageBytes("second-artifact", "spam, ham, eggs".getBytes(UTF_8));
 
     ArtifactApi.Manifest manifest =
         ArtifactApi.Manifest.newBuilder()
@@ -227,7 +228,7 @@ public void commitManifestWithAllArtifactsSucceeds() {
   @Test
   public void commitManifestWithMissingArtifactFails() {
     ArtifactApi.ArtifactMetadata firstArtifact =
-        stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
+        stageBytes("first-artifact", "foo, bar, baz, quux".getBytes(UTF_8));
     ArtifactApi.ArtifactMetadata absentArtifact =
         ArtifactApi.ArtifactMetadata.newBuilder().setName("absent").build();
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
index ede7fe7849b..05aac15d9c8 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.direct.portable.artifact;
 
-import static org.junit.Assert.fail;
-
 import io.grpc.stub.StreamObserver;
 import java.util.Optional;
 import java.util.concurrent.SynchronousQueue;
@@ -69,7 +67,7 @@ public void onNext(ArtifactChunk value) {
             try {
               thrown.put(Optional.empty());
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
 
@@ -78,7 +76,7 @@ public void onError(Throwable t) {
             try {
               thrown.put(Optional.of(t));
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
 
@@ -87,7 +85,7 @@ public void onCompleted() {
             try {
               thrown.put(Optional.empty());
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
         });
@@ -113,7 +111,7 @@ public void onNext(GetManifestResponse value) {
             try {
               thrown.put(Optional.empty());
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
 
@@ -122,7 +120,7 @@ public void onError(Throwable t) {
             try {
               thrown.put(Optional.of(t));
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
 
@@ -131,7 +129,7 @@ public void onCompleted() {
             try {
               thrown.put(Optional.empty());
             } catch (InterruptedException e) {
-              fail();
+              throw new AssertionError(e);
             }
           }
         });
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
index 9867fc0affc..4de74f1792d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.beam.runners.direct.portable.job;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.hasItems;
 import static org.junit.Assert.assertThat;
 
@@ -96,8 +97,8 @@ public void testPrepareJob() throws Exception {
     ArtifactServiceStager stager =
         ArtifactServiceStager.overChannel(
             InProcessChannelBuilder.forName(stagingEndpoint.getUrl()).build());
-    File foo = writeTempFile("foo", "foo, bar, baz".getBytes());
-    File bar = writeTempFile("spam", "spam, ham, eggs".getBytes());
+    File foo = writeTempFile("foo", "foo, bar, baz".getBytes(UTF_8));
+    File bar = writeTempFile("spam", "spam, ham, eggs".getBytes(UTF_8));
     stager.stage(
         ImmutableList.of(StagedFile.of(foo, foo.getName()), StagedFile.of(bar, 
bar.getName())));
     List<byte[]> tempDirFiles = readFlattenedFiles(runnerTemp.getRoot());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112301)
    Time Spent: 1h  (was: 50m)

> Enforce ErrorProne analysis in direct-runner project
> ----------------------------------------------------
>
>                 Key: BEAM-4309
>                 URL: https://issues.apache.org/jira/browse/BEAM-4309
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-direct
>            Reporter: Scott Wegner
>            Assignee: Scott Wegner
>            Priority: Minor
>              Labels: errorprone, starter
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-direct-java}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-direct-java:assemble}}
> # Fix each ErrorProne warning from the {{runners/direct-java}} project.
> # In {{runners/direct-java/build.gradle}}, add {{failOnWarning: true}} to the 
> call the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to