[
https://issues.apache.org/jira/browse/BEAM-4327?focusedWorklogId=113608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113608
]
ASF GitHub Bot logged work on BEAM-4327:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jun/18 07:08
Start Date: 20/Jun/18 07:08
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5647: [BEAM-4327] Fix
ErrorProne violations in fn-harness
URL: https://github.com/apache/beam/pull/5647
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/java-fn-execution/build.gradle
b/runners/java-fn-execution/build.gradle
index 3713190cec6..89506ecc0ab 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -16,7 +16,7 @@
*/
apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
description = "Apache Beam :: Runners :: Java Fn Execution"
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
index 588153fd933..35fc10d17b3 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
@@ -81,7 +81,7 @@ private InProcessEnvironmentFactory(
}
@Override
- @SuppressWarnings("FutureReturnValueIgnored")
+ @SuppressWarnings("FutureReturnValueIgnored") // no need to monitor shutdown
thread
public RemoteEnvironment createEnvironment(Environment container) throws
Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> fnHarness =
@@ -107,6 +107,7 @@ public RemoteEnvironment createEnvironment(Environment
container) throws Excepti
e);
throw e;
}
+ return (Object) null;
});
executor.submit(
() -> {
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 0d9c3b21323..e969e5f890b 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -35,8 +35,10 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
@@ -103,6 +105,7 @@
private transient ExecutorService serverExecutor;
private transient ExecutorService sdkHarnessExecutor;
+ private transient Future<?> sdkHarnessExecutorFuture;
@Before
public void setup() throws Exception {
@@ -129,14 +132,19 @@ public void setup() throws Exception {
// Create the SDK harness, and wait until it connects
sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);
- sdkHarnessExecutor.submit(
- () ->
+ sdkHarnessExecutorFuture = sdkHarnessExecutor.submit(
+ () -> {
+ try {
FnHarness.main(
PipelineOptionsFactory.create(),
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
InProcessManagedChannelFactory.create(),
- OutboundObserverFactory.clientDirect()));
+ OutboundObserverFactory.clientDirect());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
// TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker
id.
InstructionRequestHandler controlClient =
clientPool.getSource().take("", Duration.ofSeconds(2));
@@ -152,6 +160,16 @@ public void tearDown() throws Exception {
controlClient.close();
sdkHarnessExecutor.shutdownNow();
serverExecutor.shutdownNow();
+ try {
+ sdkHarnessExecutorFuture.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof RuntimeException
+ && e.getCause().getCause() instanceof InterruptedException) {
+ // expected
+ } else {
+ throw e;
+ }
+ }
}
@Test
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 31a4731d3fd..40d54e23b06 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -26,7 +26,7 @@ def dependOnProjects = [":beam-model-pipeline",
":beam-model-fn-execution", ":be
":beam-runners-core-java",
":beam-runners-core-construction-java"]
apply from: project(":").file("build_rules.gradle")
-applyJavaNature(shadowClosure: DEFAULT_SHADOW_CLOSURE <<
+applyJavaNature(failOnWarning: true, shadowClosure: DEFAULT_SHADOW_CLOSURE <<
// Create an uber jar without repackaging for the SDK harness
// TODO: We have been releasing this in the past, consider not
// releasing it since its typically bad practice to release 'all'
@@ -49,6 +49,7 @@ dependencies {
dependOnProjects.each {
compile project(path: it, configuration: "shadow")
}
+ compileOnly library.java.findbugs_annotations
compile library.java.jackson_databind
compile library.java.findbugs_jsr305
compile library.java.guava
@@ -60,6 +61,7 @@ dependencies {
compile library.java.joda_time
compile library.java.slf4j_api
provided library.java.error_prone_annotations
+ testCompileOnly library.java.findbugs_annotations
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.junit
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index dcd1b24d3f9..9778aac3e28 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -143,7 +143,7 @@
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
BeamFnStateClient beamFnStateClient,
- String pTransformId,
+ String ptransformId,
PTransform pTransform,
Supplier<String> processBundleInstructionId,
Map<String, PCollection> pCollections,
@@ -175,7 +175,7 @@
new FnApiDoFnRunner<>(
pipelineOptions,
beamFnStateClient,
- pTransformId,
+ ptransformId,
processBundleInstructionId,
doFnInfo.getDoFn(),
doFnInfo.getInputCoder(),
@@ -204,7 +204,7 @@
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
BeamFnStateClient beamFnStateClient,
- String pTransformId,
+ String ptransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
Map<String, RunnerApi.PCollection> pCollections,
@@ -285,7 +285,7 @@
DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>(
pipelineOptions,
beamFnStateClient,
- pTransformId,
+ ptransformId,
processBundleInstructionId,
doFn,
inputCoder,
@@ -654,7 +654,7 @@ public InputT element() {
@Override
public <T> T sideInput(PCollectionView<T> view) {
- return bindSideInputView(view.getTagInternal());
+ return (T) bindSideInputView(view.getTagInternal());
}
@Override
@@ -869,7 +869,7 @@ public PipelineOptions getPipelineOptions() {
@Override
public <T> T sideInput(PCollectionView<T> view) {
- return bindSideInputView(view.getTagInternal());
+ return (T) bindSideInputView(view.getTagInternal());
}
@Override
@@ -985,7 +985,7 @@ private StateKey createBagUserStateKey(String stateId) {
abstract WindowMappingFn<W> getWindowMappingFn();
}
- private <T, K, V> T bindSideInputView(TupleTag<?> view) {
+ private <K, V> Object bindSideInputView(TupleTag<?> view) {
SideInputSpec sideInputSpec = sideInputSpecMap.get(view);
checkArgument(sideInputSpec != null,
"Attempting to access unknown side input %s.",
@@ -1006,7 +1006,7 @@ private StateKey createBagUserStateKey(String stateId) {
.setPtransformId(ptransformId)
.setSideInputId(view.getId())
.setWindow(encodedWindow);
- return (T) stateKeyObjectCache.computeIfAbsent(
+ return stateKeyObjectCache.computeIfAbsent(
cacheKeyBuilder.build(),
key -> sideInputSpec.getViewFn().apply(createMultimapSideInput(
view.getId(), encodedWindow, kvCoder.getKeyCoder(),
kvCoder.getValueCoder())));
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d362d68a4ab..6f86cf5d114 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -119,7 +119,7 @@ public static void main(
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
ManagedChannelFactory channelFactory,
- OutboundObserverFactory outboundObserverFactory) {
+ OutboundObserverFactory outboundObserverFactory) throws Exception {
IdGenerator idGenerator = IdGenerators.decrementingLongs();
try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
options,
@@ -157,8 +157,6 @@ public static void main(
LOG.info("Entering instruction processing loop");
control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
- } catch (Throwable t) {
- t.printStackTrace();
} finally {
System.out.println("Shutting SDK harness down.");
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index d0df30900d1..f913f64d792 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -25,6 +25,7 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.EnumMap;
+import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -135,7 +136,7 @@ private void placePoisonPillIntoQueue() {
public void processInstructionRequests(Executor executor)
throws InterruptedException, ExecutionException {
BeamFnApi.InstructionRequest request;
- while ((request = bufferedInstructions.take()) != POISON_PILL) {
+ while (!Objects.equals((request = bufferedInstructions.take()),
POISON_PILL)) {
BeamFnApi.InstructionRequest currentRequest = request;
executor.execute(
() -> {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 0e6c9917e69..64d713e7f9d 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -274,7 +274,7 @@ private void
createRunnerAndConsumersForPTransformRecursively(
* A {@link BeamFnStateClient} which counts the number of outstanding {@link
StateRequest}s and
* blocks till they are all finished.
*/
- private class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
+ private static class BlockTillStateCallsFinish extends
HandleStateCallsForBundle {
private final BeamFnStateClient beamFnStateClient;
private final Phaser phaser;
private int currentPhase;
@@ -296,6 +296,8 @@ public void close() throws Exception {
}
@Override
+ @SuppressWarnings("FutureReturnValueIgnored") // async arriveAndDeregister
task doesn't need
+ // monitoring.
public void handle(StateRequest.Builder requestBuilder,
CompletableFuture<StateResponse> response) {
// Register each request with the phaser and arrive and deregister each
time a request
@@ -310,7 +312,7 @@ public void handle(StateRequest.Builder requestBuilder,
* A {@link BeamFnStateClient} which fails all requests because the {@link
ProcessBundleRequest}
* does not contain a State API {@link ApiServiceDescriptor}.
*/
- private class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
+ private static class FailAllStateCallsForBundle extends
HandleStateCallsForBundle {
private final ProcessBundleRequest request;
private FailAllStateCallsForBundle(ProcessBundleRequest request) {
@@ -329,7 +331,8 @@ public void handle(Builder requestBuilder,
CompletableFuture<StateResponse> resp
}
}
- private abstract class HandleStateCallsForBundle implements AutoCloseable,
BeamFnStateClient {
+ private abstract static class HandleStateCallsForBundle
+ implements AutoCloseable, BeamFnStateClient {
}
private static class UnknownPTransformRunnerFactory implements
PTransformRunnerFactory<Object> {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index 503536a1a6f..843ea527060 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -45,11 +45,11 @@ public RegisterHandler() {
idToObject = new ConcurrentHashMap<>();
}
- public <T extends Message> T getById(String id) {
+ public Message getById(String id) {
try {
LOG.debug("Attempting to find {}", id);
@SuppressWarnings("unchecked")
- CompletableFuture<T> returnValue = (CompletableFuture<T>)
computeIfAbsent(id);
+ CompletableFuture<Message> returnValue = computeIfAbsent(id);
/*
* TODO: Even though the register request instruction occurs before the
process bundle
* instruction in the control stream, the instructions are being
processed in parallel
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 4df6e1ec7e3..f138677b8ae 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -42,6 +42,7 @@
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
@@ -211,7 +212,7 @@ public void testReuseForMultipleBundles() throws Exception {
eq(CODER),
consumerCaptor.capture());
- executor.submit(
+ Future<?> future = executor.submit(
() -> {
// Sleep for some small amount of time simulating the parent blocking
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -226,6 +227,7 @@ public void testReuseForMultipleBundles() throws Exception {
});
readRunner.blockTillReadFinishes();
+ future.get();
assertThat(valuesA, contains(valueInGlobalWindow("ABC"),
valueInGlobalWindow("DEF")));
assertThat(valuesB, contains(valueInGlobalWindow("ABC"),
valueInGlobalWindow("DEF")));
@@ -241,7 +243,7 @@ public void testReuseForMultipleBundles() throws Exception {
eq(CODER),
consumerCaptor.capture());
- executor.submit(
+ future = executor.submit(
() -> {
// Sleep for some small amount of time simulating the parent blocking
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -256,6 +258,7 @@ public void testReuseForMultipleBundles() throws Exception {
});
readRunner.blockTillReadFinishes();
+ future.get();
assertThat(valuesA, contains(valueInGlobalWindow("GHI"),
valueInGlobalWindow("JKL")));
assertThat(valuesB, contains(valueInGlobalWindow("GHI"),
valueInGlobalWindow("JKL")));
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index 0e68f6d8dd1..7c9c9551411 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -57,7 +57,8 @@
.setRegister(BeamFnApi.RegisterResponse.getDefaultInstance())
.build();
- @Test
+ @Test(timeout = 10 * 1000)
+ @SuppressWarnings("FutureReturnValueIgnored") // failure will cause test to
timeout.
public void testLaunchFnHarnessAndTeardownCleanly() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113608)
Time Spent: 2.5h (was: 2h 20m)
> Enforce ErrorProne analysis in the java harness project
> -------------------------------------------------------
>
> Key: BEAM-4327
> URL: https://issues.apache.org/jira/browse/BEAM-4327
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Scott Wegner
> Assignee: Scott Wegner
> Priority: Minor
> Labels: errorprone, starter
> Fix For: 2.6.0
>
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build
> process, but only as warnings. ErrorProne errors are generally useful and
> easy to fix. Some work was done to [make sdks-java-core
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add
> enforcement. This task is clean ErrorProne warnings and add enforcement in
> {{beam-sdks-java-harness}}. Additional context discussed on the [dev
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development
> environment.
> # Run the following command to compile and run ErrorProne analysis on the
> project: {{./gradlew :beam-sdks-java-harness:assemble}}
> # Fix each ErrorProne warning from the {{sdks/java/harness}} project.
> # In {{sdks/java/harness/build.gradle}}, add {{failOnWarning: true}} to the
> call the {{applyJavaNature()}}
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach
> out|https://beam.apache.org/community/contact-us/] with questions or code
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)