[
https://issues.apache.org/jira/browse/BEAM-4309?focusedWorklogId=112301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112301
]
ASF GitHub Bot logged work on BEAM-4309:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/18 11:59
Start Date: 15/Jun/18 11:59
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5644: [BEAM-4309] Fix
ErrorProne violations in direct runner
URL: https://github.com/apache/beam/pull/5644
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 59f8be9a383..c6e7f557bae 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -24,7 +24,7 @@ def dependOnProjects = [":beam-model-pipeline",
":beam-runners-core-construction
":beam-runners-java-fn-execution",
":beam-sdks-java-fn-execution"]
apply from: project(":").file("build_rules.gradle")
-applyJavaNature(shadowClosure: DEFAULT_SHADOW_CLOSURE << {
+applyJavaNature(failOnWarning: true, shadowClosure: DEFAULT_SHADOW_CLOSURE << {
dependencies {
include(dependency(library.java.protobuf_java))
include(dependency(library.java.protobuf_java_util))
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 5522dac8627..a1da613df20 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -37,6 +37,7 @@
/**
* Returns the PCollection that the elements of this bundle belong to.
*/
+ @Override
@Nullable
PCollection<T> getPCollection();
@@ -44,6 +45,7 @@
* Returns the key that was output in the most recent {@code GroupByKey} in
the
* execution of this bundle.
*/
+ @Override
StructuralKey<?> getKey();
/**
@@ -58,6 +60,7 @@
* <p>This should be equivalent to iterating over all of the elements within
a bundle and
* selecting the minimum timestamp from among them.
*/
+ @Override
Instant getMinimumTimestamp();
/**
@@ -69,6 +72,7 @@
* processing time {@link TimerData timer} at the time this bundle was
committed, including any
* timers that fired to produce this bundle.
*/
+ @Override
Instant getSynchronizedProcessingOutputWatermark();
/**
* Return a new {@link CommittedBundle} that is like this one, except calls
to
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 42d72ebcbac..e1da6e31257 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -239,9 +239,13 @@ public static MetricQueryResults create(
abstract static class DirectMetricResult<T> implements MetricResult<T> {
// need to define these here so they appear in the correct order
// and the generated constructor is usable and consistent
+ @Override
public abstract MetricName getName();
+ @Override
public abstract String getStep();
+ @Override
public abstract T getCommitted();
+ @Override
public abstract T getAttempted();
public static <T> MetricResult<T> create(MetricName name, String scope,
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2ccea0e66bd..28ab81c3ecc 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -142,6 +142,8 @@ public TransformExecutorService load(StepAndKey stepAndKey)
throws Exception {
}
@Override
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
public void start(DirectGraph graph, RootProviderRegistry
rootProviderRegistry) {
int numTargetSplits = Math.max(3, targetParallelism);
ImmutableMap.Builder<AppliedPTransform<?, ?, ?>,
ConcurrentLinkedQueue<CommittedBundle<?>>>
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 9aa71f742d8..3e5062c0400 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -69,6 +69,8 @@ private ParallelTransformExecutor(ExecutorService executor) {
}
@Override
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
public void schedule(TransformExecutor work) {
if (active.get()) {
try {
@@ -152,6 +154,8 @@ public void shutdown() {
workQueue.clear();
}
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
private void updateCurrentlyEvaluating() {
if (currentlyEvaluating.get() == null) {
// Only synchronize if we need to update what's currently evaluating
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 4640efd5dc1..c202fffef97 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -119,6 +119,8 @@ public UnboundedReadEvaluator(
}
@Override
+ @SuppressWarnings("Finally") // Cannot use try-with-resources in order to
ensure we don't
+ // double-close the reader.
public void processElement(
WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element)
throws IOException {
UncommittedBundle<OutputT> output =
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
index 94bede0a6f2..26c760b236b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
@@ -39,12 +39,14 @@
* Returns the PCollection that the elements of this bundle belong to.
*/
@Nullable
+ @Override
PCollectionNode getPCollection();
/**
* Returns the key that was output in the most recent {@code GroupByKey} in
the
* execution of this bundle.
*/
+ @Override
StructuralKey<?> getKey();
/**
@@ -59,6 +61,7 @@
* <p>This should be equivalent to iterating over all of the elements within
a bundle and
* selecting the minimum timestamp from among them.
*/
+ @Override
Instant getMinimumTimestamp();
/**
@@ -70,6 +73,7 @@
* processing time {@link TimerData timer} at the time this bundle was
committed, including any
* timers that fired to produce this bundle.
*/
+ @Override
Instant getSynchronizedProcessingOutputWatermark();
/**
* Return a new {@link CommittedBundle} that is like this one, except calls
to
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
index cb217e26e39..f873ee47c2d 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java
@@ -110,9 +110,11 @@ public void updatePhysical(CommittedBundle<?> bundle,
UpdateT tentativeCumulativ
* @param bundle The bundle being committed.
* @param finalCumulative The final cumulative value for the given bundle.
*/
+ @SuppressWarnings("FutureReturnValueIgnored") // direct runner metrics are
best-effort;
+ // we choose not to block on
async commit
public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT
finalCumulative) {
// To prevent a query from blocking the commit, we perform the commit in
two steps.
- // 1. We perform a non-blocking write to the uncommitted table to make
the new vaule
+ // 1. We perform a non-blocking write to the uncommitted table to make
the new value
// available immediately.
// 2. We submit a runnable that will commit the update and remove the
tentative value in
// a synchronized block.
@@ -247,9 +249,13 @@ public static MetricQueryResults create(
abstract static class DirectMetricResult<T> implements MetricResult<T> {
// need to define these here so they appear in the correct order
// and the generated constructor is usable and consistent
+ @Override
public abstract MetricName getName();
+ @Override
public abstract String getStep();
+ @Override
public abstract T getCommitted();
+ @Override
public abstract T getAttempted();
public static <T> MetricResult<T> create(MetricName name, String scope,
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 4b1d91714b2..e70f5054b04 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -142,6 +142,8 @@ public TransformExecutorService load(StepAndKey stepAndKey)
throws Exception {
}
@Override
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
public void start() {
int numTargetSplits = Math.max(3, targetParallelism);
ImmutableMap.Builder<PTransformNode,
ConcurrentLinkedQueue<CommittedBundle<?>>>
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
index 5214f75fa04..d03ef8930fa 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorServices.java
@@ -69,6 +69,8 @@ private ParallelTransformExecutor(ExecutorService executor) {
}
@Override
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
public void schedule(TransformExecutor work) {
if (active.get()) {
try {
@@ -152,6 +154,8 @@ public void shutdown() {
workQueue.clear();
}
+ // TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
+ @SuppressWarnings("FutureReturnValueIgnored")
private void updateCurrentlyEvaluating() {
if (currentlyEvaluating.get() == null) {
// Only synchronize if we need to update what's currently evaluating
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index 4cb9722f76e..67297b4ec4b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -130,6 +130,7 @@ public void prepare(
}
@Override
+ @SuppressWarnings("FutureReturnValueIgnored") // Run API does not block on
execution
public void run(
JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse>
responseObserver) {
try {
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3f2a4772a97..2f15e5e474b 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -270,9 +270,10 @@ public void cancelShouldStopPipeline() throws Exception {
};
ExecutorService executor = Executors.newCachedThreadPool();
- executor.submit(cancelRunnable);
+ Future<?> cancelResult = executor.submit(cancelRunnable);
Future<PipelineResult> result = executor.submit(runPipelineRunnable);
+ cancelResult.get();
// If cancel doesn't work, this will hang forever
result.get().waitUntilFinish();
}
@@ -320,7 +321,7 @@ public void teardown() {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
TEARDOWN_CALL.set(System.nanoTime());
}
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
index 1760fb97b81..8fcd147c918 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTransformExecutorTest.java
@@ -191,9 +191,10 @@ public void processElement(WindowedValue<String> element)
throws Exception {
completionCallback,
transformEvaluationState);
- Executors.newSingleThreadExecutor().submit(executor);
+ Future<?> future = Executors.newSingleThreadExecutor().submit(executor);
evaluatorCompleted.await();
+ future.get();
assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
assertThat(completionCallback.handledResult, equalTo(result));
@@ -201,6 +202,7 @@ public void processElement(WindowedValue<String> element)
throws Exception {
}
@Test
+ @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked
via completionCallback
public void processElementThrowsExceptionCallsback() throws Exception {
final TransformResult<String> result =
StepTransformResult.<String>withoutHold(downstreamProducer).build();
@@ -241,6 +243,7 @@ public void processElement(WindowedValue<String> element)
throws Exception {
}
@Test
+ @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked
via completionCallback
public void finishBundleThrowsExceptionCallsback() throws Exception {
final Exception exception = new Exception();
TransformEvaluator<String> evaluator =
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 73ec1d8320c..482ada175f0 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -262,7 +263,7 @@ public void
getExecutionContextDifferentStepsIndependentState() {
@Test
public void handleResultStoresState() {
- StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(),
ByteArrayCoder.of());
+ StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(UTF_8),
ByteArrayCoder.of());
DirectExecutionContext fooContext =
context.getExecutionContext(downstreamProducer, myKey);
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
index 9a78a044fc0..153681f0d3e 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
@@ -23,6 +23,7 @@
import com.google.common.collect.LinkedListMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -56,7 +57,7 @@
@Test
@Ignore("https://issues.apache.org/jira/browse/BEAM-4088 Test reliably
fails.")
- public void ensureMetricsThreadDoesntLeak() {
+ public void ensureMetricsThreadDoesntLeak() throws ExecutionException,
InterruptedException {
final DirectGraph graph =
DirectGraph.create(
emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(),
emptyMap());
@@ -68,7 +69,7 @@ public void ensureMetricsThreadDoesntLeak() {
.build());
// fake a metrics usage
- metricsExecutorService.submit(() -> {});
+ metricsExecutorService.submit(() -> {}).get();
final EvaluationContext context =
EvaluationContext.create(
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 0620fb4ac16..b4c8b87ab24 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -54,7 +56,7 @@ public void setup() {
factory = new ImmutabilityEnforcementFactory();
bundleFactory = ImmutableListBundleFactory.create();
pcollection =
- p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+ p.apply(Create.of("foo".getBytes(UTF_8),
"spamhameggs".getBytes(UTF_8)))
.apply(
ParDo.of(
new DoFn<byte[], byte[]>() {
@@ -71,7 +73,7 @@ public void processElement(ProcessContext c)
@Test
public void unchangedSucceeds() {
- WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes());
+ WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
CommittedBundle<byte[]> elements =
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
@@ -86,7 +88,7 @@ public void unchangedSucceeds() {
@Test
public void mutatedDuringProcessElementThrows() {
- WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes());
+ WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
CommittedBundle<byte[]> elements =
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
@@ -107,7 +109,7 @@ public void mutatedDuringProcessElementThrows() {
@Test
public void mutatedAfterProcessElementFails() {
- WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes());
+ WindowedValue<byte[]> element =
WindowedValue.valueInGlobalWindow("bar".getBytes(UTF_8));
CommittedBundle<byte[]> elements =
bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 4236f06e64f..618b1fd79c1 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -27,6 +27,8 @@
import static org.mockito.Mockito.doAnswer;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -461,7 +463,8 @@ private CountDownLatch invokeLatchedCallback(
invocation -> {
Object callback = invocation.getArguments()[3];
final Runnable callbackRunnable = (Runnable) callback;
- Executors.newSingleThreadExecutor()
+ ListenableFuture<?> result = MoreExecutors
+ .listeningDecorator(Executors.newSingleThreadExecutor())
.submit(
() -> {
try {
@@ -471,9 +474,8 @@ private CountDownLatch invokeLatchedCallback(
callbackRunnable.run();
onComplete.countDown();
} catch (InterruptedException e) {
- fail(
- "Unexpectedly interrupted while waiting for
latch "
- + "to be counted down");
+ throw new AssertionError(
+ "Unexpectedly interrupted while waiting for
latch ", e);
}
});
return null;
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
index 0aa2c493a82..0e887ced3c8 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
@@ -22,8 +22,14 @@
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +63,7 @@ public void cachedIdDeduplicatorTrueForFirstIdThenFalse() {
}
@Test
- public void cachedIdDeduplicatorMultithreaded() throws InterruptedException {
+ public void cachedIdDeduplicatorMultithreaded() throws InterruptedException,
ExecutionException {
byte[] id = new byte[] {-1, 2, 4, 22};
UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create();
final CountDownLatch startSignal = new CountDownLatch(1);
@@ -65,22 +71,25 @@ public void cachedIdDeduplicatorMultithreaded() throws
InterruptedException {
final CountDownLatch readyLatch = new CountDownLatch(numThreads);
final CountDownLatch finishLine = new CountDownLatch(numThreads);
- ExecutorService executor = Executors.newCachedThreadPool();
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+ Executors.newCachedThreadPool());
AtomicInteger successCount = new AtomicInteger();
AtomicInteger noOutputCount = new AtomicInteger();
+ List<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
- executor.submit(new TryOutputIdRunnable(dedupper,
+ futures.add(executor.submit(new TryOutputIdRunnable(dedupper,
id,
successCount,
noOutputCount,
readyLatch,
startSignal,
- finishLine));
+ finishLine)));
}
readyLatch.await();
startSignal.countDown();
finishLine.await(10L, TimeUnit.SECONDS);
+ Futures.allAsList(futures).get();
executor.shutdownNow();
// The first thread to run will succeed, and no others will
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 92136606415..a4113f9113e 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -27,11 +28,13 @@
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Splitter;
import java.io.File;
-import java.io.FileReader;
import java.io.Reader;
import java.io.Serializable;
import java.nio.CharBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -108,12 +111,12 @@ public void dynamicallyReshardedWrite() throws Exception {
String filename = match.resourceId().toString();
files.add(filename);
CharBuffer buf = CharBuffer.allocate((int) new File(filename).length());
- try (Reader reader = new FileReader(filename)) {
+ try (Reader reader = Files.newBufferedReader(Paths.get(filename),
UTF_8)) {
reader.read(buf);
buf.flip();
}
- String[] readStrs = buf.toString().split("\n");
+ Iterable<String> readStrs = Splitter.on("\n").split(buf.toString());
for (String read : readStrs) {
if (read.length() > 0) {
actuals.add(read);
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
index 4eb02657fce..50d2326e949 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
@@ -33,6 +33,7 @@
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
@@ -193,9 +194,10 @@ public void processElement(WindowedValue<String> element)
throws Exception {
completionCallback,
transformEvaluationState);
- Executors.newSingleThreadExecutor().submit(executor);
+ Future<?> future = Executors.newSingleThreadExecutor().submit(executor);
evaluatorCompleted.await();
+ future.get();
assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
assertThat(completionCallback.handledResult, Matchers.equalTo(result));
@@ -203,6 +205,7 @@ public void processElement(WindowedValue<String> element)
throws Exception {
}
@Test
+ @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked
via completionCallback
public void processElementThrowsExceptionCallsback() throws Exception {
final TransformResult<String> result =
StepTransformResult.<String>withoutHold(downstreamProducer).build();
@@ -242,6 +245,7 @@ public void processElement(WindowedValue<String> element)
throws Exception {
}
@Test
+ @SuppressWarnings("FutureReturnValueIgnored") // expected exception checked
via completionCallback
public void finishBundleThrowsExceptionCallsback() throws Exception {
final Exception exception = new Exception();
TransformEvaluator<String> evaluator =
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
index 6ce38b53f1d..d41292bf31f 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct.portable;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@@ -149,7 +150,7 @@ public void
getExecutionContextDifferentStepsIndependentState() {
@Test
public void handleResultStoresState() {
- StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(),
ByteArrayCoder.of());
+ StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(UTF_8),
ByteArrayCoder.of());
StepStateAndTimers fooContext =
context.getStateAndTimers(downstreamProducer, myKey);
StateTag<BagState<Integer>> intBag = StateTags.bag("myBag",
VarIntCoder.of());
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
index e49588b2776..ef285469902 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct.portable.artifact;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
@@ -91,7 +92,7 @@ public void teardown() throws Exception {
@Test
public void retrieveManifest() throws Exception {
Map<String, byte[]> artifacts = new HashMap<>();
- artifacts.put("foo", "bar, baz, quux".getBytes());
+ artifacts.put("foo", "bar, baz, quux".getBytes(UTF_8));
artifacts.put("spam", new byte[] {127, -22, 5});
stageAndCreateRetrievalService(artifacts);
@@ -129,7 +130,7 @@ public void onCompleted() {
@Test
public void retrieveArtifact() throws Exception {
Map<String, byte[]> artifacts = new HashMap<>();
- byte[] fooContents = "bar, baz, quux".getBytes();
+ byte[] fooContents = "bar, baz, quux".getBytes(UTF_8);
artifacts.put("foo", fooContents);
byte[] spamContents = {127, -22, 5};
artifacts.put("spam", spamContents);
@@ -152,7 +153,8 @@ public void retrieveArtifact() throws Exception {
@Test
public void retrieveArtifactNotPresent() throws Exception {
- stageAndCreateRetrievalService(Collections.singletonMap("foo", "bar, baz,
quux".getBytes()));
+ stageAndCreateRetrievalService(Collections.singletonMap(
+ "foo", "bar, baz, quux".getBytes(UTF_8)));
final CountDownLatch completed = new CountDownLatch(1);
final AtomicReference<Throwable> thrown = new AtomicReference<>();
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
index 84b4a073b7c..e43481e51f9 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct.portable.artifact;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -79,7 +80,7 @@ public void teardown() {
@Test
public void singleDataPutArtifactSucceeds() throws Exception {
- byte[] data = "foo-bar-baz".getBytes();
+ byte[] data = "foo-bar-baz".getBytes(UTF_8);
RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
new RecordingStreamObserver<>();
StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -112,9 +113,9 @@ public void singleDataPutArtifactSucceeds() throws
Exception {
@Test
public void multiPartPutArtifactSucceeds() throws Exception {
- byte[] partOne = "foo-".getBytes();
- byte[] partTwo = "bar-".getBytes();
- byte[] partThree = "baz".getBytes();
+ byte[] partOne = "foo-".getBytes(UTF_8);
+ byte[] partTwo = "bar-".getBytes(UTF_8);
+ byte[] partThree = "baz".getBytes(UTF_8);
RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
new RecordingStreamObserver<>();
StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -158,12 +159,12 @@ public void multiPartPutArtifactSucceeds() throws
Exception {
assertThat(staged.exists(), is(true));
ByteBuffer buf = ByteBuffer.allocate("foo-bar-baz".length());
new FileInputStream(staged).getChannel().read(buf);
- Assert.assertArrayEquals("foo-bar-baz".getBytes(), buf.array());
+ Assert.assertArrayEquals("foo-bar-baz".getBytes(UTF_8), buf.array());
}
@Test
public void putArtifactBeforeNameFails() {
- byte[] data = "foo-".getBytes();
+ byte[] data = "foo-".getBytes(UTF_8);
RecordingStreamObserver<ArtifactApi.PutArtifactResponse> responseObserver =
new RecordingStreamObserver<>();
StreamObserver<ArtifactApi.PutArtifactRequest> requestObserver =
@@ -200,9 +201,9 @@ public void putArtifactWithNoContentFails() {
@Test
public void commitManifestWithAllArtifactsSucceeds() {
ArtifactApi.ArtifactMetadata firstArtifact =
- stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
+ stageBytes("first-artifact", "foo, bar, baz, quux".getBytes(UTF_8));
ArtifactApi.ArtifactMetadata secondArtifact =
- stageBytes("second-artifact", "spam, ham, eggs".getBytes());
+ stageBytes("second-artifact", "spam, ham, eggs".getBytes(UTF_8));
ArtifactApi.Manifest manifest =
ArtifactApi.Manifest.newBuilder()
@@ -227,7 +228,7 @@ public void commitManifestWithAllArtifactsSucceeds() {
@Test
public void commitManifestWithMissingArtifactFails() {
ArtifactApi.ArtifactMetadata firstArtifact =
- stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
+ stageBytes("first-artifact", "foo, bar, baz, quux".getBytes(UTF_8));
ArtifactApi.ArtifactMetadata absentArtifact =
ArtifactApi.ArtifactMetadata.newBuilder().setName("absent").build();
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
index ede7fe7849b..05aac15d9c8 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.direct.portable.artifact;
-import static org.junit.Assert.fail;
-
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.concurrent.SynchronousQueue;
@@ -69,7 +67,7 @@ public void onNext(ArtifactChunk value) {
try {
thrown.put(Optional.empty());
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
@@ -78,7 +76,7 @@ public void onError(Throwable t) {
try {
thrown.put(Optional.of(t));
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
@@ -87,7 +85,7 @@ public void onCompleted() {
try {
thrown.put(Optional.empty());
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
});
@@ -113,7 +111,7 @@ public void onNext(GetManifestResponse value) {
try {
thrown.put(Optional.empty());
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
@@ -122,7 +120,7 @@ public void onError(Throwable t) {
try {
thrown.put(Optional.of(t));
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
@@ -131,7 +129,7 @@ public void onCompleted() {
try {
thrown.put(Optional.empty());
} catch (InterruptedException e) {
- fail();
+ throw new AssertionError(e);
}
}
});
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
index 9867fc0affc..4de74f1792d 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.beam.runners.direct.portable.job;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertThat;
@@ -96,8 +97,8 @@ public void testPrepareJob() throws Exception {
ArtifactServiceStager stager =
ArtifactServiceStager.overChannel(
InProcessChannelBuilder.forName(stagingEndpoint.getUrl()).build());
- File foo = writeTempFile("foo", "foo, bar, baz".getBytes());
- File bar = writeTempFile("spam", "spam, ham, eggs".getBytes());
+ File foo = writeTempFile("foo", "foo, bar, baz".getBytes(UTF_8));
+ File bar = writeTempFile("spam", "spam, ham, eggs".getBytes(UTF_8));
stager.stage(
ImmutableList.of(StagedFile.of(foo, foo.getName()), StagedFile.of(bar,
bar.getName())));
List<byte[]> tempDirFiles = readFlattenedFiles(runnerTemp.getRoot());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 112301)
Time Spent: 1h (was: 50m)
> Enforce ErrorProne analysis in direct-runner project
> ----------------------------------------------------
>
> Key: BEAM-4309
> URL: https://issues.apache.org/jira/browse/BEAM-4309
> Project: Beam
> Issue Type: Improvement
> Components: runner-direct
> Reporter: Scott Wegner
> Assignee: Scott Wegner
> Priority: Minor
> Labels: errorprone, starter
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build
> process, but only as warnings. ErrorProne errors are generally useful and
> easy to fix. Some work was done to [make sdks-java-core
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add
> enforcement. This task is clean ErrorProne warnings and add enforcement in
> {{beam-runners-direct-java}}. Additional context discussed on the [dev
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development
> environment.
> # Run the following command to compile and run ErrorProne analysis on the
> project: {{./gradlew :beam-runners-direct-java:assemble}}
> # Fix each ErrorProne warning from the {{runners/direct-java}} project.
> # In {{runners/direct-java/build.gradle}}, add {{failOnWarning: true}} to the
> call the {{applyJavaNature()}}
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach
> out|https://beam.apache.org/community/contact-us/] with questions or code
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)