[
https://issues.apache.org/jira/browse/BEAM-3882?focusedWorklogId=82368&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82368
]
ASF GitHub Bot logged work on BEAM-3882:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Mar/18 17:49
Start Date: 20/Mar/18 17:49
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4900: [BEAM-3882] Fix
StateRequestHandler interface to be idiomatic
URL: https://github.com/apache/beam/pull/4900
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index bfa7480513e..183f1f11622 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -75,23 +75,19 @@ public AutoCloseable registerForProcessBundleInstructionId(
@Override
public void onNext(StateRequest request) {
- CompletionStage<StateResponse.Builder> responseStage = new
CompletableFuture<>();
- responseStage.whenCompleteAsync(
- (StateResponse.Builder responseBuilder, Throwable t) ->
- // note that this is threadsafe if and only if outboundObserver
is threadsafe.
- outboundObserver.onNext(
- t == null
- ? responseBuilder.setId(request.getId()).build()
- : StateResponse.newBuilder()
- .setId(request.getId())
- .setError(getStackTraceAsString(t))
- .build()));
StateRequestHandler handler =
- requestHandlers.getOrDefault(request.getInstructionReference(),
this::handlerNotFound);
+ requestHandlers.getOrDefault(request.getInstructionReference(),
this::handlerNotFound);
try {
- handler.accept(request, responseStage);
+ CompletionStage<StateResponse.Builder> result =
handler.handle(request);
+ result.whenCompleteAsync(
+ (StateResponse.Builder responseBuilder, Throwable t) ->
+ // note that this is threadsafe if and only if
outboundObserver is threadsafe.
+ outboundObserver.onNext(
+ t == null
+ ? responseBuilder.setId(request.getId()).build()
+ : createErrorResponse(request.getId(), t)));
} catch (Exception e) {
- responseStage.toCompletableFuture().completeExceptionally(e);
+ outboundObserver.onNext(createErrorResponse(request.getId(), e));
}
}
@@ -105,14 +101,19 @@ public void onCompleted() {
outboundObserver.onCompleted();
}
- private void handlerNotFound(
- StateRequest request, CompletionStage<StateResponse.Builder>
responseFuture) {
- responseFuture.toCompletableFuture().complete(
+ private CompletionStage<StateResponse.Builder>
handlerNotFound(StateRequest request) {
+ CompletableFuture<StateResponse.Builder> result = new
CompletableFuture<>();
+ result.complete(
StateResponse.newBuilder()
.setError(
String.format(
"Unknown process bundle instruction id '%s'",
request.getInstructionReference())));
+ return result;
+ }
+
+ private StateResponse createErrorResponse(String id, Throwable t) {
+ return
StateResponse.newBuilder().setId(id).setError(getStackTraceAsString(t)).build();
}
}
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
index bc9f892d75d..cfa86db1b04 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
@@ -34,7 +34,6 @@
* <p>Throwing an error during handling will complete the handler result
{@link CompletionStage}
* exceptionally.
*/
- void accept(
- BeamFnApi.StateRequest request,
CompletionStage<BeamFnApi.StateResponse.Builder> result)
+ CompletionStage<BeamFnApi.StateResponse.Builder>
handle(BeamFnApi.StateRequest request)
throws Exception;
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
index 20b5425823f..07db6ee8578 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
@@ -20,7 +20,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -29,6 +28,7 @@
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -81,7 +81,7 @@ public void testStateRequestsHandledByRegisteredHandlers()
throws Exception {
requestObserver.onNext(request);
// assert behavior
- verify(handler).accept(eq(request), any());
+ verify(handler).handle(request);
}
@Test
@@ -95,7 +95,11 @@ public void testHandlerResponseSentToStateStream() throws
Exception {
.newBuilder()
.setGet(BeamFnApi.StateGetResponse.newBuilder().setData(expectedResponseData));
StateRequestHandler dummyHandler =
- (request, result) ->
result.toCompletableFuture().complete(expectedBuilder);
+ (request) -> {
+ CompletableFuture<BeamFnApi.StateResponse.Builder> response = new
CompletableFuture<>();
+ response.complete(expectedBuilder);
+ return response;
+ };
// define observer behavior
BlockingDeque<BeamFnApi.StateResponse> responses = new
LinkedBlockingDeque<>();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 82368)
Time Spent: 1h (was: 50m)
> Make StateRequestHandler::accept interface idiomatic.
> -----------------------------------------------------
>
> Key: BEAM-3882
> URL: https://issues.apache.org/jira/browse/BEAM-3882
> Project: Beam
> Issue Type: Improvement
> Components: runner-core
> Reporter: Axel Magnuson
> Assignee: Axel Magnuson
> Priority: Minor
> Time Spent: 1h
> Remaining Estimate: 0h
>
> StateRequestHandler was based on some Dataflow SDK source code that does not
> conform to Beam coding conventions. In particular, the accept method takes
> its return value as a parameter, which is a code smell.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)