Repository: beam Updated Branches: refs/heads/master 34eb9b0f3 -> 5c21ba7b4
Add an Endpoints Proto file This contains the APIServiceDescriptor proto, which is used for specifying an endpoint to communicate to. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9bbed6d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9bbed6d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9bbed6d4 Branch: refs/heads/master Commit: 9bbed6d441351a91720d17f1dfc4f236a96afdc5 Parents: 34eb9b0 Author: Thomas Groh <tg...@google.com> Authored: Thu Sep 28 11:30:38 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Thu Oct 5 11:30:28 2017 -0700 ---------------------------------------------------------------------- .../fn-api/src/main/proto/beam_fn_api.proto | 27 ++---------- .../runner-api/src/main/proto/endpoints.proto | 46 ++++++++++++++++++++ .../beam/fn/harness/BeamFnDataReadRunner.java | 3 +- .../beam/fn/harness/BeamFnDataWriteRunner.java | 3 +- .../org/apache/beam/fn/harness/FnHarness.java | 19 ++++---- .../harness/channel/ManagedChannelFactory.java | 2 +- .../fn/harness/control/BeamFnControlClient.java | 5 ++- .../harness/control/ProcessBundleHandler.java | 2 +- .../beam/fn/harness/data/BeamFnDataClient.java | 5 ++- .../fn/harness/data/BeamFnDataGrpcClient.java | 15 ++++--- .../harness/data/BeamFnDataGrpcMultiplexer.java | 22 +++++----- .../fn/harness/logging/BeamFnLoggingClient.java | 7 +-- .../state/BeamFnStateGrpcClientCache.java | 6 +-- .../fn/harness/BeamFnDataReadRunnerTest.java | 3 +- .../fn/harness/BeamFnDataWriteRunnerTest.java | 3 +- .../apache/beam/fn/harness/FnHarnessTest.java | 7 ++- .../channel/ManagedChannelFactoryTest.java | 2 +- .../control/BeamFnControlClientTest.java | 7 +-- .../control/ProcessBundleHandlerTest.java | 2 +- .../harness/data/BeamFnDataGrpcClientTest.java | 19 ++++---- .../data/BeamFnDataGrpcMultiplexerTest.java | 5 ++- .../logging/BeamFnLoggingClientTest.java | 7 +-- .../state/BeamFnStateGrpcClientCacheTest.java | 4 +- .../portability/universal_local_runner.py | 4 +- .../runners/worker/log_handler_test.py | 3 +- .../runners/worker/sdk_worker_main.py | 6 +-- 26 files changed, 137 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 9d4c5f6..5a01077 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -38,6 +38,7 @@ option java_package = "org.apache.beam.fn.v1"; option java_outer_classname = "BeamFnApi"; import "beam_runner_api.proto"; +import "endpoints.proto"; import "google/protobuf/timestamp.proto"; /* @@ -73,7 +74,7 @@ message Target { message RemoteGrpcPort { // (Required) An API descriptor which describes where to // connect to including any authentication that is required. - ApiServiceDescriptor api_service_descriptor = 1; + org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 1; } /* @@ -174,7 +175,7 @@ message ProcessBundleDescriptor { // A descriptor describing the end point to use for State API // calls. Required if the Runner intends to send remote references over the // data plane or if any of the transforms rely on user state or side inputs. - ApiServiceDescriptor state_api_service_descriptor = 7; + org.apache.beam.portability.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } // A request to process a given bundle. @@ -706,28 +707,6 @@ service BeamFnLogging { /* * Environment types */ -message ApiServiceDescriptor { - // (Required) A pipeline level unique id which can be used as a reference to - // refer to this. - string id = 1; - - // (Required) The URL to connect to. - string url = 2; - - // (Optional) The method for authentication. If unspecified, access to the - // url is already being performed in a trusted context (e.g. localhost, - // private network). - oneof authentication { - OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3; - } -} - -message OAuth2ClientCredentialsGrant { - // (Required) The URL to submit a "client_credentials" grant type request for - // an OAuth access token which will be used as a bearer token for requests. - string url = 1; -} - // A Docker container configuration for launching the SDK harness to execute // user specified functions. message DockerContainer { http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/common/runner-api/src/main/proto/endpoints.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/endpoints.proto b/sdks/common/runner-api/src/main/proto/endpoints.proto new file mode 100644 index 0000000..a642e63 --- /dev/null +++ b/sdks/common/runner-api/src/main/proto/endpoints.proto @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffers describing endpoints containing a service. + */ + +syntax = "proto3"; + +package org.apache.beam.portability.v1; + +option java_package = "org.apache.beam.portability.v1"; +option java_outer_classname = "Endpoints"; + +message ApiServiceDescriptor { + // (Required) The URL to connect to. + string url = 2; + + // (Optional) The method for authentication. If unspecified, access to the + // url is already being performed in a trusted context (e.g. localhost, + // private network). + oneof authentication { + OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3; + } +} + +message OAuth2ClientCredentialsGrant { + // (Required) The URL to submit a "client_credentials" grant type request for + // an OAuth access token which will be used as a bearer token for requests. + string url = 1; +} http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index f254ec4..4cae4f1 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; @@ -113,7 +114,7 @@ public class BeamFnDataReadRunner<OutputT> { } } - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; private final Supplier<String> processBundleInstructionIdSupplier; private final BeamFnDataClient beamFnDataClientFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index 179a228..20402f8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; @@ -106,7 +107,7 @@ public class BeamFnDataWriteRunner<InputT> { } } - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; private final BeamFnApi.Target outputTarget; private final Coder<WindowedValue<InputT>> coder; private final BeamFnDataClient beamFnDataClientFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 49a7a88..5ed93e4 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -32,6 +32,7 @@ import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.harness.stream.StreamObserverFactory; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -44,10 +45,10 @@ import org.slf4j.LoggerFactory; * <p>This entry point expects the following environment variables: * <ul> * <li>LOGGING_API_SERVICE_DESCRIPTOR: A - * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text + * {@link org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor} encoded as text * representing the endpoint that is to be connected to for the Beam Fn Logging service.</li> * <li>CONTROL_API_SERVICE_DESCRIPTOR: A - * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text + * {@link org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor} encoded as text * representing the endpoint that is to be connected to for the Beam Fn Control service.</li> * <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions} * for further details.</li> @@ -59,10 +60,10 @@ public class FnHarness { private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS"; private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class); - private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String env) + private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String env) throws TextFormat.ParseException { - BeamFnApi.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = - BeamFnApi.ApiServiceDescriptor.newBuilder(); + Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = + Endpoints.ApiServiceDescriptor.newBuilder(); TextFormat.merge(System.getenv(env), apiServiceDescriptorBuilder); return apiServiceDescriptorBuilder.build(); } @@ -78,18 +79,18 @@ public class FnHarness { PipelineOptions options = objectMapper.readValue( System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); - BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor = + Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR); - BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor = + Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR); main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor); } public static void main(PipelineOptions options, - BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor, - BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { + Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, + Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options); StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options); PrintStream originalErrStream = System.err; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java index 3138bab..c7e60fd 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java @@ -27,7 +27,7 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import java.net.SocketAddress; import java.util.List; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index 1c4d277..8b34f0c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -34,6 +34,7 @@ import java.util.function.Function; import org.apache.beam.fn.harness.fn.ThrowingFunction; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnControlGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +66,8 @@ public class BeamFnControlClient { private final CompletableFuture<Void> onFinish; public BeamFnControlClient( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, + Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, BiFunction<Function<StreamObserver<BeamFnApi.InstructionRequest>, StreamObserver<BeamFnApi.InstructionResponse>>, StreamObserver<BeamFnApi.InstructionRequest>, http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index e094487..c311c4c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -44,11 +44,11 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; import org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest; import org.apache.beam.fn.v1.BeamFnApi.StateRequest; import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder; import org.apache.beam.fn.v1.BeamFnApi.StateResponse; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java index 7be96b6..a3c2f5d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -43,7 +44,7 @@ public interface BeamFnDataClient { * <p>The consumer is not required to be thread safe. */ <T> CompletableFuture<Void> forInboundConsumer( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, KV<String, BeamFnApi.Target> inputLocation, Coder<WindowedValue<T>> coder, ThrowingConsumer<WindowedValue<T>> consumer); @@ -58,7 +59,7 @@ public interface BeamFnDataClient { * <p>The returned closeable consumer is not thread safe. */ <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, KV<String, BeamFnApi.Target> outputLocation, Coder<WindowedValue<T>> coder); } http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java index 8351626..f9aebdf 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java @@ -29,6 +29,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnDataGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; @@ -44,8 +45,8 @@ import org.slf4j.LoggerFactory; public class BeamFnDataGrpcClient implements BeamFnDataClient { private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcClient.class); - private final ConcurrentMap<BeamFnApi.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache; - private final Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory; + private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache; + private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory; private final BiFunction<Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>>, StreamObserver<BeamFnApi.Elements>, @@ -54,7 +55,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { public BeamFnDataGrpcClient( PipelineOptions options, - Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, BiFunction<Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>>, StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>> streamObserverFactory) { @@ -74,7 +75,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { */ @Override public <T> CompletableFuture<Void> forInboundConsumer( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, KV<String, BeamFnApi.Target> inputLocation, Coder<WindowedValue<T>> coder, ThrowingConsumer<WindowedValue<T>> consumer) { @@ -101,7 +102,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { */ @Override public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, KV<String, BeamFnApi.Target> outputLocation, Coder<WindowedValue<T>> coder) { BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); @@ -114,9 +115,9 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { } private BeamFnDataGrpcMultiplexer getClientFor( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor) { + Endpoints.ApiServiceDescriptor apiServiceDescriptor) { return cache.computeIfAbsent(apiServiceDescriptor, - (BeamFnApi.ApiServiceDescriptor descriptor) -> new BeamFnDataGrpcMultiplexer( + (Endpoints.ApiServiceDescriptor descriptor) -> new BeamFnDataGrpcMultiplexer( descriptor, (StreamObserver<BeamFnApi.Elements> inboundObserver) -> streamObserverFactory.apply( BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data, http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java index 8ee5491..a3c3986 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java @@ -27,26 +27,28 @@ import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A gRPC multiplexer for a specific {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor}. + * A gRPC multiplexer for a specific {@link + * org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor}. * - * <p>Multiplexes data for inbound consumers based upon their individual - * {@link org.apache.beam.fn.v1.BeamFnApi.Target}s. + * <p>Multiplexes data for inbound consumers based upon their individual {@link + * org.apache.beam.fn.v1.BeamFnApi.Target}s. * - * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those - * streams. For inbound streams, this is as thread safe as the inbound observers. For outbound - * streams, this is as thread safe as the underlying stream observer. + * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams. + * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this + * is as thread safe as the underlying stream observer. * - * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying - * the output location with a specific outbound observer. + * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output + * location with a specific outbound observer. */ public class BeamFnDataGrpcMultiplexer { private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class); - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; private final StreamObserver<BeamFnApi.Elements> inboundObserver; private final StreamObserver<BeamFnApi.Elements> outboundObserver; @VisibleForTesting @@ -55,7 +57,7 @@ public class BeamFnDataGrpcMultiplexer { consumers; public BeamFnDataGrpcMultiplexer( - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) { this.apiServiceDescriptor = apiServiceDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index c9f5d80..a8f151c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -48,6 +48,7 @@ import java.util.logging.Logger; import java.util.logging.SimpleFormatter; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnLoggingGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -94,7 +95,7 @@ public class BeamFnLoggingClient implements AutoCloseable { * garbage collected. java.util.logging only has weak references to the loggers * so if they are garbage collected, our hierarchical configuration will be lost. */ private final Collection<Logger> configuredLoggers; - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; private final ManagedChannel channel; private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver; private final LogControlObserver inboundObserver; @@ -103,8 +104,8 @@ public class BeamFnLoggingClient implements AutoCloseable { public BeamFnLoggingClient( PipelineOptions options, - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, + Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, BiFunction<Function<StreamObserver<BeamFnApi.LogControl>, StreamObserver<BeamFnApi.LogEntry.List>>, StreamObserver<BeamFnApi.LogControl>, http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java index 316e3e6..51a047a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java @@ -28,11 +28,11 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient; -import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; import org.apache.beam.fn.v1.BeamFnApi.StateRequest; import org.apache.beam.fn.v1.BeamFnApi.StateResponse; import org.apache.beam.fn.v1.BeamFnStateGrpc; +import org.apache.beam.portability.v1.Endpoints; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class BeamFnStateGrpcClientCache { public BeamFnStateGrpcClientCache( PipelineOptions options, Supplier<String> idGenerator, - Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, BiFunction<Function<StreamObserver<StateResponse>, StreamObserver<StateRequest>>, StreamObserver<StateResponse>, StreamObserver<StateRequest>> streamObserverFactory) { http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index e5b4968..9b76fe1 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -52,6 +52,7 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.test.TestExecutors; import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -77,7 +78,7 @@ import org.mockito.MockitoAnnotations; public class BeamFnDataReadRunnerTest { private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() - .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); + .setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() .setPayload(PORT_SPEC.toByteString()).build(); private static final Coder<WindowedValue<String>> CODER = http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index c4b717a..8e9ebb8 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -49,6 +49,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -72,7 +73,7 @@ import org.mockito.MockitoAnnotations; public class BeamFnDataWriteRunnerTest { private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() - .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); + .setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() .setPayload(PORT_SPEC.toByteString()).build(); private static final String CODER_ID = "string-coder-id"; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java index d92ba72..cdc4b01 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java @@ -36,6 +36,7 @@ import org.apache.beam.fn.v1.BeamFnApi.InstructionResponse; import org.apache.beam.fn.v1.BeamFnApi.LogControl; import org.apache.beam.fn.v1.BeamFnControlGrpc; import org.apache.beam.fn.v1.BeamFnLoggingGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -106,14 +107,12 @@ public class FnHarnessTest { Server controlServer = ServerBuilder.forPort(0).addService(controlService).build(); controlServer.start(); try { - BeamFnApi.ApiServiceDescriptor loggingDescriptor = BeamFnApi.ApiServiceDescriptor + Endpoints.ApiServiceDescriptor loggingDescriptor = Endpoints.ApiServiceDescriptor .newBuilder() - .setId("1L") .setUrl("localhost:" + loggingServer.getPort()) .build(); - BeamFnApi.ApiServiceDescriptor controlDescriptor = BeamFnApi.ApiServiceDescriptor + Endpoints.ApiServiceDescriptor controlDescriptor = Endpoints.ApiServiceDescriptor .newBuilder() - .setId("2L") .setUrl("localhost:" + controlServer.getPort()) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java index 9f634c9..62bb1ba 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeTrue; import io.grpc.ManagedChannel; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java index edb7903..fedc7d4 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java @@ -42,6 +42,7 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction; import org.apache.beam.fn.harness.test.TestStreams; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnControlGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -91,8 +92,8 @@ public class BeamFnControlClientTest { TestStreams.withOnNext(values::add) .withOnCompleted(() -> clientClosedStream.set(true)).build(); - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor = - BeamFnApi.ApiServiceDescriptor.newBuilder() + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) .build(); Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) @@ -136,7 +137,7 @@ public class BeamFnControlClientTest { BeamFnControlClient client = new BeamFnControlClient( apiServiceDescriptor, - (BeamFnApi.ApiServiceDescriptor descriptor) -> channel, + (Endpoints.ApiServiceDescriptor descriptor) -> channel, this::createStreamForTest, handlers); http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 94fa6ad..026348c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -45,9 +45,9 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; import org.apache.beam.fn.v1.BeamFnApi.StateRequest; import org.apache.beam.fn.v1.BeamFnApi.StateResponse; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java index 31eb0db..2f3bc2c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java @@ -48,6 +48,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.test.TestStreams; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnDataGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -129,8 +130,8 @@ public class BeamFnDataGrpcClientTest { CallStreamObserver<BeamFnApi.Elements> inboundServerObserver = TestStreams.withOnNext(inboundServerValues::add).build(); - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor = - BeamFnApi.ApiServiceDescriptor.newBuilder() + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) .build(); Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) @@ -151,7 +152,7 @@ public class BeamFnDataGrpcClientTest { BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient( PipelineOptionsFactory.create(), - (BeamFnApi.ApiServiceDescriptor descriptor) -> channel, + (Endpoints.ApiServiceDescriptor descriptor) -> channel, this::createStreamForTest); CompletableFuture<Void> readFutureA = clientFactory.forInboundConsumer( @@ -197,8 +198,8 @@ public class BeamFnDataGrpcClientTest { CallStreamObserver<BeamFnApi.Elements> inboundServerObserver = TestStreams.withOnNext(inboundServerValues::add).build(); - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor = - BeamFnApi.ApiServiceDescriptor.newBuilder() + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) .build(); Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) @@ -220,7 +221,7 @@ public class BeamFnDataGrpcClientTest { BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient( PipelineOptionsFactory.create(), - (BeamFnApi.ApiServiceDescriptor descriptor) -> channel, + (Endpoints.ApiServiceDescriptor descriptor) -> channel, this::createStreamForTest); CompletableFuture<Void> readFuture = clientFactory.forInboundConsumer( @@ -271,8 +272,8 @@ public class BeamFnDataGrpcClientTest { } ).build(); - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor = - BeamFnApi.ApiServiceDescriptor.newBuilder() + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) .build(); Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) @@ -292,7 +293,7 @@ public class BeamFnDataGrpcClientTest { BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient( PipelineOptionsFactory.fromArgs( new String[]{ "--experiments=beam_fn_api_data_buffer_limit=20" }).create(), - (BeamFnApi.ApiServiceDescriptor descriptor) -> channel, + (Endpoints.ApiServiceDescriptor descriptor) -> channel, this::createStreamForTest); try (CloseableThrowingConsumer<WindowedValue<String>> consumer = http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java index a9095ae..fdef03d 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java @@ -32,13 +32,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.beam.fn.harness.test.TestStreams; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.values.KV; import org.junit.Test; /** Tests for {@link BeamFnDataGrpcMultiplexer}. */ public class BeamFnDataGrpcMultiplexerTest { - private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR = - BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build(); + private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build(); private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION = KV.of( "777L", http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java index c2c26e7..c50695c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java @@ -44,6 +44,7 @@ import java.util.logging.LogRecord; import org.apache.beam.fn.harness.test.TestStreams; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.fn.v1.BeamFnLoggingGrpc; +import org.apache.beam.portability.v1.Endpoints; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; import org.junit.runner.RunWith; @@ -108,8 +109,8 @@ public class BeamFnLoggingClientTest { } }).build(); - BeamFnApi.ApiServiceDescriptor apiServiceDescriptor = - BeamFnApi.ApiServiceDescriptor.newBuilder() + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) .build(); Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) @@ -133,7 +134,7 @@ public class BeamFnLoggingClientTest { "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" }).create(), apiServiceDescriptor, - (BeamFnApi.ApiServiceDescriptor descriptor) -> channel, + (Endpoints.ApiServiceDescriptor descriptor) -> channel, this::createStreamForTest); // Ensure that log levels were correctly set. http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java index f0e84c7..a51e7b4 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java @@ -41,10 +41,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Function; import org.apache.beam.fn.harness.IdGenerator; import org.apache.beam.fn.harness.test.TestStreams; -import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; import org.apache.beam.fn.v1.BeamFnApi.StateRequest; import org.apache.beam.fn.v1.BeamFnApi.StateResponse; import org.apache.beam.fn.v1.BeamFnStateGrpc; +import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.After; import org.junit.Before; @@ -111,7 +111,7 @@ public class BeamFnStateGrpcClientCacheTest { clientCache.forApiServiceDescriptor(apiServiceDescriptor)); assertNotSame(clientCache.forApiServiceDescriptor(apiServiceDescriptor), clientCache.forApiServiceDescriptor( - ApiServiceDescriptor.newBuilder().setId("OTHER").build())); + ApiServiceDescriptor.getDefaultInstance())); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/portability/universal_local_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 65b66c6..579983c 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -31,9 +31,9 @@ from concurrent import futures import grpc from google.protobuf import text_format -from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_job_api_pb2_grpc +from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners import runner from apache_beam.runners.portability import fn_api_runner @@ -337,7 +337,7 @@ class SubprocessSdkWorker(object): def run(self): control_descriptor = text_format.MessageToString( - beam_fn_api_pb2.ApiServiceDescriptor(url=self._control_address)) + endpoints_pb2.ApiServiceDescriptor(url=self._control_address)) p = subprocess.Popen( self._worker_command_line, shell=True, http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/worker/log_handler_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index d2647d0..647b8b7 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -24,6 +24,7 @@ import grpc from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.worker import log_handler @@ -50,7 +51,7 @@ class FnApiLogRecordHandlerTest(unittest.TestCase): self.test_port = self.server.add_insecure_port('[::]:0') self.server.start() - self.logging_service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor() + self.logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor() self.logging_service_descriptor.url = 'localhost:%s' % self.test_port self.fn_log_handler = log_handler.FnApiLogRecordHandler( self.logging_service_descriptor) http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/worker/sdk_worker_main.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 5dbf582..70e4c96 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -23,7 +23,7 @@ import sys from google.protobuf import text_format -from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler from apache_beam.runners.worker.sdk_worker import SdkHarness @@ -33,7 +33,7 @@ from apache_beam.runners.worker.sdk_worker import SdkHarness def main(unused_argv): """Main entry point for SDK Fn Harness.""" if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ: - logging_service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor() + logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor() text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'], logging_service_descriptor) @@ -47,7 +47,7 @@ def main(unused_argv): try: logging.info('Python sdk harness started.') - service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor() + service_descriptor = endpoints_pb2.ApiServiceDescriptor() text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'], service_descriptor) # TODO(robertwb): Support credentials.