Fork Control Clients into java-fn-execution Deprecate versions in runners-core. Runner-side portability APIs should not have a dependency edge to an SDK, and use of the java-fn-execution package ensures that.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed655be Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed655be Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed655be Branch: refs/heads/master Commit: 9ed655be780630e1218d185bd0d2ebfea099b988 Parents: 29399fd Author: Thomas Groh <[email protected]> Authored: Mon Nov 6 15:03:17 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Wed Nov 8 16:43:42 2017 -0800 ---------------------------------------------------------------------- .../runners/core/fn/FnApiControlClient.java | 4 + .../core/fn/FnApiControlClientPoolService.java | 8 +- .../beam/runners/core/fn/SdkHarnessClient.java | 4 + runners/java-fn-execution/pom.xml | 15 ++ .../fnexecution/control/FnApiControlClient.java | 148 ++++++++++++++++ .../control/FnApiControlClientPoolService.java | 66 +++++++ .../fnexecution/control/SdkHarnessClient.java | 173 +++++++++++++++++++ .../fnexecution/control/package-info.java | 23 +++ .../FnApiControlClientPoolServiceTest.java | 65 +++++++ .../control/FnApiControlClientTest.java | 139 +++++++++++++++ .../control/SdkHarnessClientTest.java | 96 ++++++++++ 11 files changed, 740 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java index 7546851..811444c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java @@ -39,7 +39,11 @@ import org.slf4j.LoggerFactory; * connections). * * <p>This low-level client is responsible only for correlating requests with responses. + * + * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this + * functionality. */ +@Deprecated class FnApiControlClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class); http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java index fd28040..21fc4f7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java @@ -24,7 +24,13 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A Fn API control service which adds incoming SDK harness connections to a pool. */ +/** + * A Fn API control service which adds incoming SDK harness connections to a pool. + * + * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this + * functionality. + */ +@Deprecated public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class); http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java index bfd1837..091dea1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java @@ -31,7 +31,11 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi; * * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link * FnDataReceiver}, which handle lower-level gRPC message wrangling. + * + * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this + * functionality. */ +@Deprecated public class SdkHarnessClient { /** http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index 1941f49..6ff08b7 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -97,6 +97,17 @@ <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + <!-- Test dependencies --> <dependency> <groupId>junit</groupId> @@ -116,5 +127,9 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java new file mode 100644 index 0000000..8133988 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A client for the control plane of an SDK harness, which can issue requests to it over the Fn API. + * + * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer. + * + * <p>The Fn API is inverted so the runner is the server and the SDK harness is the client, for + * firewalling reasons (the runner may execute in a more privileged environment forbidding outbound + * connections). + * + * <p>This low-level client is responsible only for correlating requests with responses. + */ +class FnApiControlClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class); + + // All writes to this StreamObserver need to be synchronized. + private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver; + private final ResponseStreamObserver responseObserver = new ResponseStreamObserver(); + private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests; + private volatile boolean isClosed; + + private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) { + this.requestReceiver = requestReceiver; + this.outstandingRequests = new ConcurrentHashMap<>(); + } + + /** + * Returns a {@link FnApiControlClient} which will submit its requests to the provided + * observer. + * + * <p>It is the responsibility of the caller to register this object as an observer of incoming + * responses (this will generally be done as part of fulfilling the contract of a gRPC service). + */ + public static FnApiControlClient forRequestObserver( + StreamObserver<BeamFnApi.InstructionRequest> requestObserver) { + return new FnApiControlClient(requestObserver); + } + + public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle( + BeamFnApi.InstructionRequest request) { + LOG.debug("Sending InstructionRequest {}", request); + SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create(); + outstandingRequests.put(request.getInstructionId(), resultFuture); + requestReceiver.onNext(request); + return resultFuture; + } + + StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() { + return responseObserver; + } + + @Override + public void close() { + closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection")); + } + + /** Closes this client and terminates any outstanding requests exceptionally. */ + private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) { + if (isClosed) { + return; + } + + // Make a copy of the map to make the view of the outstanding requests consistent. + Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy = + new ConcurrentHashMap<>(outstandingRequests); + outstandingRequests.clear(); + isClosed = true; + + if (outstandingRequestsCopy.isEmpty()) { + requestReceiver.onCompleted(); + return; + } + requestReceiver.onError( + new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage()))); + + LOG.error( + "{} closed, clearing outstanding requests {}", + FnApiControlClient.class.getSimpleName(), + outstandingRequestsCopy); + for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest : + outstandingRequestsCopy.values()) { + outstandingRequest.setException(cause); + } + } + + /** + * A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener. + */ + private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> { + /** + * Processes an incoming {@link BeamFnApi.InstructionResponse} by correlating it with the + * corresponding {@link BeamFnApi.InstructionRequest} and completes the future that was returned + * by {@link #handle}. + */ + @Override + public void onNext(BeamFnApi.InstructionResponse response) { + LOG.debug("Received InstructionResponse {}", response); + SettableFuture<BeamFnApi.InstructionResponse> completableFuture = + outstandingRequests.remove(response.getInstructionId()); + if (completableFuture != null) { + completableFuture.set(response); + } + } + + /** */ + @Override + public void onCompleted() { + closeAndTerminateOutstandingRequests( + new IllegalStateException("SDK harness closed connection")); + } + + @Override + public void onError(Throwable cause) { + LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause); + closeAndTerminateOutstandingRequests(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java new file mode 100644 index 0000000..37fae00 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import io.grpc.stub.StreamObserver; +import java.util.concurrent.BlockingQueue; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A Fn API control service which adds incoming SDK harness connections to a pool. */ +public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class); + + private final BlockingQueue<FnApiControlClient> clientPool; + + private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPool) { + this.clientPool = clientPool; + } + + /** + * Creates a new {@link FnApiControlClientPoolService} which will enqueue and vend new SDK harness + * connections. + */ + public static FnApiControlClientPoolService offeringClientsToPool( + BlockingQueue<FnApiControlClient> clientPool) { + return new FnApiControlClientPoolService(clientPool); + } + + /** + * Called by gRPC for each incoming connection from an SDK harness, and enqueue an available SDK + * harness client. + * + * <p>Note: currently does not distinguish what sort of SDK it is, so a separate instance is + * required for each. + */ + @Override + public StreamObserver<BeamFnApi.InstructionResponse> control( + StreamObserver<BeamFnApi.InstructionRequest> requestObserver) { + LOGGER.info("Beam Fn Control client connected."); + FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver); + try { + clientPool.put(newClient); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return newClient.asResponseObserver(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java new file mode 100644 index 0000000..5b47a58 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.fnexecution.data.FnDataReceiver; + +/** + * A high-level client for an SDK harness. + * + * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link + * FnDataReceiver}, which handle lower-level gRPC message wrangling. + */ +public class SdkHarnessClient { + + /** + * A supply of unique identifiers, used internally. These must be unique across all Fn API + * clients. + */ + public interface IdGenerator { + String getId(); + } + + /** A supply of unique identifiers that are simply incrementing longs. */ + private static class CountingIdGenerator implements IdGenerator { + private final AtomicLong nextId = new AtomicLong(0L); + + @Override + public String getId() { + return String.valueOf(nextId.incrementAndGet()); + } + } + + /** + * An active bundle for a particular {@link + * BeamFnApi.ProcessBundleDescriptor}. + */ + @AutoValue + public abstract static class ActiveBundle<InputT> { + public abstract String getBundleId(); + + public abstract Future<BeamFnApi.ProcessBundleResponse> getBundleResponse(); + + public abstract FnDataReceiver<InputT> getInputReceiver(); + + public static <InputT> ActiveBundle<InputT> create( + String bundleId, + Future<BeamFnApi.ProcessBundleResponse> response, + FnDataReceiver<InputT> dataReceiver) { + return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, dataReceiver); + } + } + + private final IdGenerator idGenerator; + private final FnApiControlClient fnApiControlClient; + + private SdkHarnessClient( + FnApiControlClient fnApiControlClient, + IdGenerator idGenerator) { + this.idGenerator = idGenerator; + this.fnApiControlClient = fnApiControlClient; + } + + /** + * Creates a client for a particular SDK harness. It is the responsibility of the caller to ensure + * that these correspond to the same SDK harness, so control plane and data plane messages can be + * correctly associated. + */ + public static SdkHarnessClient usingFnApiClient(FnApiControlClient fnApiControlClient) { + return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator()); + } + + public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) { + return new SdkHarnessClient(fnApiControlClient, idGenerator); + } + + /** + * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future + * processing. + * + * <p>A client may block on the result future, but may also proceed without blocking. + */ + public Future<BeamFnApi.RegisterResponse> register( + Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) { + + // TODO: validate that all the necessary data endpoints are known + + ListenableFuture<BeamFnApi.InstructionResponse> genericResponse = + fnApiControlClient.handle( + BeamFnApi.InstructionRequest.newBuilder() + .setInstructionId(idGenerator.getId()) + .setRegister( + BeamFnApi.RegisterRequest.newBuilder() + .addAllProcessBundleDescriptor(processBundleDescriptors) + .build()) + .build()); + + return Futures.transform( + genericResponse, + new Function<BeamFnApi.InstructionResponse, BeamFnApi.RegisterResponse>() { + @Override + public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) { + return input.getRegister(); + } + }); + } + + /** + * Start a new bundle for the given {@link + * BeamFnApi.ProcessBundleDescriptor} identifier. + * + * <p>The input channels for the returned {@link ActiveBundle} are derived from the + * instructions in the {@link BeamFnApi.ProcessBundleDescriptor}. + */ + public ActiveBundle newBundle(String processBundleDescriptorId) { + String bundleId = idGenerator.getId(); + + // TODO: acquire an input receiver from appropriate FnDataService + FnDataReceiver dataReceiver = new FnDataReceiver() { + @Override + public void accept(Object input) throws Exception { + throw new UnsupportedOperationException("Placeholder FnDataReceiver cannot accept data."); + } + + @Override + public void close() throws IOException { + // noop + } + }; + + ListenableFuture<BeamFnApi.InstructionResponse> genericResponse = + fnApiControlClient.handle( + BeamFnApi.InstructionRequest.newBuilder() + .setProcessBundle( + BeamFnApi.ProcessBundleRequest.newBuilder() + .setProcessBundleDescriptorReference(processBundleDescriptorId)) + .build()); + + ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse = + Futures.transform( + genericResponse, + new Function<BeamFnApi.InstructionResponse, BeamFnApi.ProcessBundleResponse>() { + @Override + public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse input) { + return input.getProcessBundle(); + } + }); + + return ActiveBundle.create(bundleId, specificResponse, dataReceiver); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java new file mode 100644 index 0000000..791faa2 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for a Beam runner to interact with the Fn API {@link + * org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc Control Service} via java abstractions. + */ +package org.apache.beam.runners.fnexecution.control; http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java new file mode 100644 index 0000000..9392ee0 --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link FnApiControlClientPoolService}. */ +@RunWith(JUnit4.class) +public class FnApiControlClientPoolServiceTest { + + // For ease of straight-line testing, we use a LinkedBlockingQueue; in practice a SynchronousQueue + // for matching incoming connections and server threads is likely. + private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue<>(); + private FnApiControlClientPoolService controlService = + FnApiControlClientPoolService.offeringClientsToPool(pool); + + @Test + public void testIncomingConnection() throws Exception { + StreamObserver<BeamFnApi.InstructionRequest> requestObserver = mock(StreamObserver.class); + StreamObserver<BeamFnApi.InstructionResponse> responseObserver = + controlService.control(requestObserver); + + FnApiControlClient client = pool.take(); + + // Check that the client is wired up to the request channel + String id = "fakeInstruction"; + ListenableFuture<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class)); + assertThat(responseFuture.isDone(), is(false)); + + // Check that the response channel really came from the client + responseObserver.onNext( + BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); + responseFuture.get(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java new file mode 100644 index 0000000..4732f5e --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link FnApiControlClient}. */ +@RunWith(JUnit4.class) +public class FnApiControlClientTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver; + private FnApiControlClient client; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + client = FnApiControlClient.forRequestObserver(mockObserver); + } + + @Test + public void testRequestSent() { + String id = "instructionId"; + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class)); + } + + @Test + public void testRequestSuccess() throws Exception { + String id = "successfulInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + client + .asResponseObserver() + .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); + + BeamFnApi.InstructionResponse response = responseFuture.get(); + + assertThat(response.getInstructionId(), equalTo(id)); + } + + @Test + public void testUnknownResponseIgnored() throws Exception { + String id = "actualInstruction"; + String unknownId = "unknownInstruction"; + + ListenableFuture<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client + .asResponseObserver() + .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build()); + + assertThat(responseFuture.isDone(), is(false)); + assertThat(responseFuture.isCancelled(), is(false)); + } + + @Test + public void testOnCompletedCancelsOutstanding() throws Exception { + String id = "clientHangUpInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client.asResponseObserver().onCompleted(); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("closed"); + responseFuture.get(); + } + + @Test + public void testOnErrorCancelsOutstanding() throws Exception { + String id = "errorInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + class FrazzleException extends Exception {} + client.asResponseObserver().onError(new FrazzleException()); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(FrazzleException.class)); + responseFuture.get(); + } + + @Test + public void testCloseCancelsOutstanding() throws Exception { + String id = "serverCloseInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client.close(); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("closed"); + responseFuture.get(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java new file mode 100644 index 0000000..09437c7 --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.Future; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link SdkHarnessClient}. */ +@RunWith(JUnit4.class) +public class SdkHarnessClientTest { + + @Mock public FnApiControlClient fnApiControlClient; + + private SdkHarnessClient sdkHarnessClient; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient); + } + + @Test + public void testRegisterDoesNotCrash() throws Exception { + String descriptorId1 = "descriptor1"; + String descriptorId2 = "descriptor2"; + + SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = SettableFuture.create(); + when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) + .thenReturn(registerResponseFuture); + + Future<BeamFnApi.RegisterResponse> responseFuture = sdkHarnessClient.register( + ImmutableList.of( + BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(), + BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build())); + + // Correlating the RegisterRequest and RegisterResponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.RegisterResponse response = BeamFnApi.RegisterResponse.getDefaultInstance(); + registerResponseFuture.set( + BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build()); + responseFuture.get(); + } + + @Test + public void testNewBundleNoDataDoesNotCrash() throws Exception { + String descriptorId1 = "descriptor1"; + + SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = + SettableFuture.create(); + when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) + .thenReturn(processBundleResponseFuture); + + SdkHarnessClient.ActiveBundle activeBundle = sdkHarnessClient.newBundle(descriptorId1); + + // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance(); + processBundleResponseFuture.set( + BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); + activeBundle.getBundleResponse().get(); + } +}
