[
https://issues.apache.org/jira/browse/BEAM-3676?focusedWorklogId=81993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81993
]
ASF GitHub Bot logged work on BEAM-3676:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Mar/18 19:57
Start Date: 19/Mar/18 19:57
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #4884: [BEAM-3676] Add
Portable State Service
URL: https://github.com/apache/beam/pull/4884
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
new file mode 100644
index 00000000000..bfa7480513e
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An implementation of the Beam Fn State service. */
+public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
+ implements StateDelegator, FnService {
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcStateService.class);
+ private final ConcurrentHashMap<String, StateRequestHandler> requestHandlers;
+
+ public GrpcStateService()
+ throws Exception {
+ this.requestHandlers = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void close() {
+ // TODO: Track multiple clients and disconnect them cleanly instead of
forcing termination
+ }
+
+ @Override
+ public StreamObserver<StateRequest> state(StreamObserver<StateResponse>
responseObserver) {
+ return new Inbound(responseObserver);
+ }
+
+ @Override
+ public AutoCloseable registerForProcessBundleInstructionId(
+ String processBundleInstructionId, StateRequestHandler handler) {
+ requestHandlers.put(processBundleInstructionId, handler);
+ return () -> requestHandlers.remove(processBundleInstructionId);
+ }
+
+ /**
+ * An inbound {@link StreamObserver} which delegates requests to registered
handlers.
+ *
+ * <p>Is only threadsafe if the outbound observer is threadsafe.
+ *
+ * <p>TODO: Handle when the client indicates completion or an error on the
inbound stream and
+ * there are pending requests.
+ */
+ private class Inbound implements StreamObserver<StateRequest> {
+ private final StreamObserver<StateResponse> outboundObserver;
+
+ Inbound(StreamObserver<StateResponse> outboundObserver) {
+ this.outboundObserver = outboundObserver;
+ }
+
+ @Override
+ public void onNext(StateRequest request) {
+ CompletionStage<StateResponse.Builder> responseStage = new
CompletableFuture<>();
+ responseStage.whenCompleteAsync(
+ (StateResponse.Builder responseBuilder, Throwable t) ->
+ // note that this is threadsafe if and only if outboundObserver
is threadsafe.
+ outboundObserver.onNext(
+ t == null
+ ? responseBuilder.setId(request.getId()).build()
+ : StateResponse.newBuilder()
+ .setId(request.getId())
+ .setError(getStackTraceAsString(t))
+ .build()));
+ StateRequestHandler handler =
+ requestHandlers.getOrDefault(request.getInstructionReference(),
this::handlerNotFound);
+ try {
+ handler.accept(request, responseStage);
+ } catch (Exception e) {
+ responseStage.toCompletableFuture().completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ outboundObserver.onCompleted();
+ }
+
+ @Override
+ public void onCompleted() {
+ outboundObserver.onCompleted();
+ }
+
+ private void handlerNotFound(
+ StateRequest request, CompletionStage<StateResponse.Builder>
responseFuture) {
+ responseFuture.toCompletableFuture().complete(
+ StateResponse.newBuilder()
+ .setError(
+ String.format(
+ "Unknown process bundle instruction id '%s'",
+ request.getInstructionReference())));
+ }
+ }
+}
+
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateDelegator.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateDelegator.java
new file mode 100644
index 00000000000..1cf18db62c0
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateDelegator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+
+/**
+ * The {@link StateDelegator} is able to delegate {@link StateRequest}s to a
set of registered
+ * handlers. Any request for an unregistered process bundle instruction id is
automatically failed.
+ */
+public interface StateDelegator {
+ /**
+ * Registers the supplied handler for the given process bundle instruction
id for all {@link
+ * StateRequest}s with a matching id. A handle is returned which allows one
to deregister from
+ * this {@link StateDelegator}.
+ */
+ AutoCloseable registerForProcessBundleInstructionId(
+ String processBundleInstructionId, StateRequestHandler handler);
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
new file mode 100644
index 00000000000..bc9f892d75d
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+
+/**
+ * Handler for {@link
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest StateRequests}.
+ */
+public interface StateRequestHandler {
+ /**
+ * Handle a {@link
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest} asynchronously.
+ *
+ * <p>The handler is allowed to complete the future within the callers
thread if it can be
+ * completed without blocking. Otherwise the caller should delegate to
another thread to perform
+ * any blocking work completing the future when able.
+ *
+ * <p>Throwing an error during handling will complete the handler result
{@link CompletionStage}
+ * exceptionally.
+ */
+ void accept(
+ BeamFnApi.StateRequest request,
CompletionStage<BeamFnApi.StateResponse.Builder> result)
+ throws Exception;
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/package-info.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/package-info.java
new file mode 100644
index 00000000000..5af2ea706e0
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** State API services. */
+package org.apache.beam.runners.fnexecution.state;
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
new file mode 100644
index 00000000000..20b5425823f
--- /dev/null
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link
org.apache.beam.runners.fnexecution.state.GrpcStateService}. */
+@RunWith(JUnit4.class)
+public class GrpcStateServiceTest {
+ private static final long TIMEOUT_MS = 30 * 1000;
+
+ private GrpcStateService stateService;
+
+ @Mock
+ private StreamObserver<BeamFnApi.StateResponse> responseObserver;
+
+ @Mock
+ private StateRequestHandler handler;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ stateService = new GrpcStateService();
+ }
+
+ /**
+ * After a handler has been registered with
+ * {@link GrpcStateService#registerForProcessBundleInstructionId(String,
StateRequestHandler)},
+ * the {@link GrpcStateService} should delegate requests through
+ * {@link GrpcStateService#state(StreamObserver)} to the registered handler.
+ */
+ @Test
+ public void testStateRequestsHandledByRegisteredHandlers() throws Exception {
+ // register handler
+ String bundleInstructionId = "bundle_instruction";
+ stateService.registerForProcessBundleInstructionId(bundleInstructionId,
handler);
+
+ // open state stream
+ StreamObserver requestObserver = stateService.state(responseObserver);
+
+ // send state request
+ BeamFnApi.StateRequest request =
+
BeamFnApi.StateRequest.newBuilder().setInstructionReference(bundleInstructionId).build();
+ requestObserver.onNext(request);
+
+ // assert behavior
+ verify(handler).accept(eq(request), any());
+ }
+
+ @Test
+ public void testHandlerResponseSentToStateStream() throws Exception {
+ // define handler behavior
+ ByteString expectedResponseData =
+ ByteString.copyFrom("EXPECTED_RESPONSE_DATA", StandardCharsets.UTF_8);
+ String bundleInstructionId = "EXPECTED_BUNDLE_INSTRUCTION_ID";
+ BeamFnApi.StateResponse.Builder expectedBuilder =
+ BeamFnApi.StateResponse
+ .newBuilder()
+
.setGet(BeamFnApi.StateGetResponse.newBuilder().setData(expectedResponseData));
+ StateRequestHandler dummyHandler =
+ (request, result) ->
result.toCompletableFuture().complete(expectedBuilder);
+
+ // define observer behavior
+ BlockingDeque<BeamFnApi.StateResponse> responses = new
LinkedBlockingDeque<>();
+ StreamObserver<BeamFnApi.StateResponse> recordingResponseObserver =
+ TestStreams
+ .withOnNext(responses::add)
+ .build();
+ recordingResponseObserver = Mockito.spy(recordingResponseObserver);
+
+ // register handler
+ stateService.registerForProcessBundleInstructionId(bundleInstructionId,
dummyHandler);
+
+ // open state stream
+ StreamObserver<BeamFnApi.StateRequest> requestObserver =
+ stateService.state(recordingResponseObserver);
+
+ // send state request
+ BeamFnApi.StateRequest request =
+
BeamFnApi.StateRequest.newBuilder().setInstructionReference(bundleInstructionId).build();
+ requestObserver.onNext(request);
+
+ // wait for response
+ BeamFnApi.StateResponse response = responses.poll(TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+
+ // assert that responses contain a built state response
+ verify(recordingResponseObserver, times(1)).onNext(any());
+ verify(recordingResponseObserver, never()).onCompleted();
+ verify(recordingResponseObserver, never()).onError(any());
+ assertThat(response.getGet().getData(), equalTo(expectedResponseData));
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 81993)
Time Spent: 3h 50m (was: 3h 40m)
> FlinkRunner: Portable state service
> -----------------------------------
>
> Key: BEAM-3676
> URL: https://issues.apache.org/jira/browse/BEAM-3676
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Axel Magnuson
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> The State API is an implementation of BeamFnState that exposes pipeline state
> to SDK harnesses. Because it is used for side inputs, this service will also
> need to be tied into side inputs/outputs during the translation phase.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)