Add a runners/java-fn-execution module This contains libraries for runner authors to create Fn API services and RPCs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/927a8db1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/927a8db1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/927a8db1 Branch: refs/heads/mr-runner Commit: 927a8db1397bc43c6cb253d6ca856afdbfa472a3 Parents: fdd5971 Author: Thomas Groh <[email protected]> Authored: Wed Oct 11 18:47:51 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Nov 2 18:25:12 2017 -0700 ---------------------------------------------------------------------- pom.xml | 6 + runners/java-fn-execution/pom.xml | 105 +++++++++++++++ .../beam/runners/fnexecution/ServerFactory.java | 104 +++++++++++++++ .../beam/runners/fnexecution/package-info.java | 23 ++++ .../runners/fnexecution/ServerFactoryTest.java | 128 +++++++++++++++++++ runners/pom.xml | 1 + 6 files changed, 367 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f1eee91..b2ab5d7 100644 --- a/pom.xml +++ b/pom.xml @@ -684,6 +684,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-java-fn-execution</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-reference-job-orchestrator</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml new file mode 100644 index 0000000..bd4fcf0 --- /dev/null +++ b/runners/java-fn-execution/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-parent</artifactId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-runners-java-fn-execution</artifactId> + + <name>Apache Beam :: Runners :: Java Fn Execution</name> + + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <!-- Override Beam parent to allow Java8 --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-model-pipeline</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-model-fn-execution</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-fn-execution</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-fn-execution</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java new file mode 100644 index 0000000..918672a --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.net.HostAndPort; +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.beam.harness.channel.SocketAddressFactory; +import org.apache.beam.model.pipeline.v1.Endpoints; + +/** + * A {@link Server gRPC server} factory. + */ +public abstract class ServerFactory { + /** + * Create a default {@link ServerFactory}. + */ + public static ServerFactory createDefault() { + return new InetSocketAddressServerFactory(); + } + + /** + * Creates an instance of this server using an ephemeral port chosen automatically. The chosen + * port is accessible to the caller from the URL set in the input {@link + * Endpoints.ApiServiceDescriptor.Builder}. + */ + public abstract Server allocatePortAndCreate( + BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException; + + /** + * Creates an instance of this server at the address specified by the given service descriptor. + */ + public abstract Server create( + BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException; + + /** + * Creates a {@link Server gRPC Server} using the default server factory. + * + * <p>The server is created listening any open port on "localhost". + */ + public static class InetSocketAddressServerFactory extends ServerFactory { + private InetSocketAddressServerFactory() {} + + @Override + public Server allocatePortAndCreate( + BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor) + throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + Server server = createServer(service, address); + apiServiceDescriptor.setUrl( + HostAndPort.fromParts(address.getHostName(), server.getPort()).toString()); + return server; + } + + @Override + public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) + throws IOException { + SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl()); + checkArgument( + socketAddress instanceof InetSocketAddress, + "%s %s requires a host:port socket address, got %s", + getClass().getSimpleName(), ServerFactory.class.getSimpleName(), + serviceDescriptor.getUrl()); + return createServer(service, (InetSocketAddress) socketAddress); + } + + private static Server createServer(BindableService service, InetSocketAddress socket) + throws IOException { + Server server = + NettyServerBuilder.forPort(socket.getPort()) + .addService(service) + // Set the message size to max value here. The actual size is governed by the + // buffer size in the layers above. + .maxMessageSize(Integer.MAX_VALUE) + .build(); + server.start(); + return server; + } + + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java new file mode 100644 index 0000000..bc36f5e --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities used by runners to interact with the fn execution components of the Beam Portability + * Framework. + */ +package org.apache.beam.runners.fnexecution; http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java new file mode 100644 index 0000000..aa8d246 --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.Uninterruptibles; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.harness.channel.ManagedChannelFactory; +import org.apache.beam.harness.test.TestStreams; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; +import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.junit.Test; + +/** + * Tests for {@link ServerFactory}. + */ +public class ServerFactoryTest { + + private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder() + .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")) + .build(); + private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder() + .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")) + .build(); + + @Test + public void testCreatingDefaultServer() throws Exception { + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault()); + HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl()); + assertThat(hostAndPort.getHost(), anyOf( + equalTo(InetAddress.getLoopbackAddress().getHostName()), + equalTo(InetAddress.getLoopbackAddress().getHostAddress()))); + assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536))); + } + + private Endpoints.ApiServiceDescriptor runTestUsing( + ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception { + Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = + Endpoints.ApiServiceDescriptor.newBuilder(); + + Collection<Elements> serverElements = new ArrayList<>(); + CountDownLatch clientHangedUp = new CountDownLatch(1); + CallStreamObserver<Elements> serverInboundObserver = + TestStreams.withOnNext(serverElements::add) + .withOnCompleted(clientHangedUp::countDown) + .build(); + TestDataService service = new TestDataService(serverInboundObserver); + Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder); + assertFalse(server.isShutdown()); + + ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build()); + BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel); + Collection<BeamFnApi.Elements> clientElements = new ArrayList<>(); + CountDownLatch serverHangedUp = new CountDownLatch(1); + CallStreamObserver<BeamFnApi.Elements> clientInboundObserver = + TestStreams.withOnNext(clientElements::add) + .withOnCompleted(serverHangedUp::countDown) + .build(); + + StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver); + StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take(); + + clientOutboundObserver.onNext(CLIENT_DATA); + serverOutboundObserver.onNext(SERVER_DATA); + clientOutboundObserver.onCompleted(); + clientHangedUp.await(); + serverOutboundObserver.onCompleted(); + serverHangedUp.await(); + + assertThat(clientElements, contains(SERVER_DATA)); + assertThat(serverElements, contains(CLIENT_DATA)); + + return apiServiceDescriptorBuilder.build(); + } + + /** A test gRPC service that uses the provided inbound observer for all clients. */ + private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase { + private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers; + private final StreamObserver<BeamFnApi.Elements> inboundObserver; + private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) { + this.inboundObserver = inboundObserver; + this.outboundObservers = new LinkedBlockingQueue<>(); + } + + @Override + public StreamObserver<BeamFnApi.Elements> data( + StreamObserver<BeamFnApi.Elements> outboundObserver) { + Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver); + return inboundObserver; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 164d1b3..df3faa9 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -63,6 +63,7 @@ <jdk>[1.8,)</jdk> </activation> <modules> + <module>java-fn-execution</module> <module>gearpump</module> </modules> </profile>
