Repository: beam
Updated Branches:
  refs/heads/master 34eb9b0f3 -> 5c21ba7b4


Add an Endpoints Proto file

This contains the APIServiceDescriptor proto, which is used for
specifying an endpoint to communicate to.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9bbed6d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9bbed6d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9bbed6d4

Branch: refs/heads/master
Commit: 9bbed6d441351a91720d17f1dfc4f236a96afdc5
Parents: 34eb9b0
Author: Thomas Groh <tg...@google.com>
Authored: Thu Sep 28 11:30:38 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Oct 5 11:30:28 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 27 ++----------
 .../runner-api/src/main/proto/endpoints.proto   | 46 ++++++++++++++++++++
 .../beam/fn/harness/BeamFnDataReadRunner.java   |  3 +-
 .../beam/fn/harness/BeamFnDataWriteRunner.java  |  3 +-
 .../org/apache/beam/fn/harness/FnHarness.java   | 19 ++++----
 .../harness/channel/ManagedChannelFactory.java  |  2 +-
 .../fn/harness/control/BeamFnControlClient.java |  5 ++-
 .../harness/control/ProcessBundleHandler.java   |  2 +-
 .../beam/fn/harness/data/BeamFnDataClient.java  |  5 ++-
 .../fn/harness/data/BeamFnDataGrpcClient.java   | 15 ++++---
 .../harness/data/BeamFnDataGrpcMultiplexer.java | 22 +++++-----
 .../fn/harness/logging/BeamFnLoggingClient.java |  7 +--
 .../state/BeamFnStateGrpcClientCache.java       |  6 +--
 .../fn/harness/BeamFnDataReadRunnerTest.java    |  3 +-
 .../fn/harness/BeamFnDataWriteRunnerTest.java   |  3 +-
 .../apache/beam/fn/harness/FnHarnessTest.java   |  7 ++-
 .../channel/ManagedChannelFactoryTest.java      |  2 +-
 .../control/BeamFnControlClientTest.java        |  7 +--
 .../control/ProcessBundleHandlerTest.java       |  2 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  | 19 ++++----
 .../data/BeamFnDataGrpcMultiplexerTest.java     |  5 ++-
 .../logging/BeamFnLoggingClientTest.java        |  7 +--
 .../state/BeamFnStateGrpcClientCacheTest.java   |  4 +-
 .../portability/universal_local_runner.py       |  4 +-
 .../runners/worker/log_handler_test.py          |  3 +-
 .../runners/worker/sdk_worker_main.py           |  6 +--
 26 files changed, 137 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 9d4c5f6..5a01077 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,6 +38,7 @@ option java_package = "org.apache.beam.fn.v1";
 option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
+import "endpoints.proto";
 import "google/protobuf/timestamp.proto";
 
 /*
@@ -73,7 +74,7 @@ message Target {
 message RemoteGrpcPort {
   // (Required) An API descriptor which describes where to
   // connect to including any authentication that is required.
-  ApiServiceDescriptor api_service_descriptor = 1;
+  org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 
1;
 }
 
 /*
@@ -174,7 +175,7 @@ message ProcessBundleDescriptor {
   // A descriptor describing the end point to use for State API
   // calls. Required if the Runner intends to send remote references over the
   // data plane or if any of the transforms rely on user state or side inputs.
-  ApiServiceDescriptor state_api_service_descriptor = 7;
+  org.apache.beam.portability.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
 // A request to process a given bundle.
@@ -706,28 +707,6 @@ service BeamFnLogging {
 /*
  * Environment types
  */
-message ApiServiceDescriptor {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1;
-
-  // (Required) The URL to connect to.
-  string url = 2;
-
-  // (Optional) The method for authentication. If unspecified, access to the
-  // url is already being performed in a trusted context (e.g. localhost,
-  // private network).
-  oneof authentication {
-    OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3;
-  }
-}
-
-message OAuth2ClientCredentialsGrant {
-  // (Required) The URL to submit a "client_credentials" grant type request for
-  // an OAuth access token which will be used as a bearer token for requests.
-  string url = 1;
-}
-
 // A Docker container configuration for launching the SDK harness to execute
 // user specified functions.
 message DockerContainer {

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/common/runner-api/src/main/proto/endpoints.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/endpoints.proto 
b/sdks/common/runner-api/src/main/proto/endpoints.proto
new file mode 100644
index 0000000..a642e63
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/endpoints.proto
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing endpoints containing a service.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.portability.v1;
+
+option java_package = "org.apache.beam.portability.v1";
+option java_outer_classname = "Endpoints";
+
+message ApiServiceDescriptor {
+  // (Required) The URL to connect to.
+  string url = 2;
+
+  // (Optional) The method for authentication. If unspecified, access to the
+  // url is already being performed in a trusted context (e.g. localhost,
+  // private network).
+  oneof authentication {
+    OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3;
+  }
+}
+
+message OAuth2ClientCredentialsGrant {
+  // (Required) The URL to submit a "client_credentials" grant type request for
+  // an OAuth access token which will be used as a bearer token for requests.
+  string url = 1;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index f254ec4..4cae4f1 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.coders.Coder;
@@ -113,7 +114,7 @@ public class BeamFnDataReadRunner<OutputT> {
     }
   }
 
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
   private final Supplier<String> processBundleInstructionIdSupplier;
   private final BeamFnDataClient beamFnDataClientFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 179a228..20402f8 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.coders.Coder;
@@ -106,7 +107,7 @@ public class BeamFnDataWriteRunner<InputT> {
     }
   }
 
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final BeamFnApi.Target outputTarget;
   private final Coder<WindowedValue<InputT>> coder;
   private final BeamFnDataClient beamFnDataClientFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 49a7a88..5ed93e4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -32,6 +32,7 @@ import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.StreamObserverFactory;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -44,10 +45,10 @@ import org.slf4j.LoggerFactory;
  * <p>This entry point expects the following environment variables:
  * <ul>
  *   <li>LOGGING_API_SERVICE_DESCRIPTOR: A
- *   {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as 
text
+ *   {@link org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor} 
encoded as text
  *   representing the endpoint that is to be connected to for the Beam Fn 
Logging service.</li>
  *   <li>CONTROL_API_SERVICE_DESCRIPTOR: A
- *   {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as 
text
+ *   {@link org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor} 
encoded as text
  *   representing the endpoint that is to be connected to for the Beam Fn 
Control service.</li>
  *   <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See 
{@link PipelineOptions}
  *   for further details.</li>
@@ -59,10 +60,10 @@ public class FnHarness {
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
 
-  private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String 
env)
+  private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String 
env)
       throws TextFormat.ParseException {
-    BeamFnApi.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
-        BeamFnApi.ApiServiceDescriptor.newBuilder();
+    Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+        Endpoints.ApiServiceDescriptor.newBuilder();
     TextFormat.merge(System.getenv(env), apiServiceDescriptorBuilder);
     return apiServiceDescriptorBuilder.build();
   }
@@ -78,18 +79,18 @@ public class FnHarness {
     PipelineOptions options = objectMapper.readValue(
         System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
 
-    BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor =
+    Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
         getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
 
-    BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor =
+    Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
         getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);
 
     main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
   }
 
   public static void main(PipelineOptions options,
-      BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor,
-      BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws 
Exception {
+      Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws 
Exception {
     ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
     StreamObserverFactory streamObserverFactory = 
StreamObserverFactory.fromOptions(options);
     PrintStream originalErrStream = System.err;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
index 3138bab..c7e60fd 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -27,7 +27,7 @@ import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.unix.DomainSocketAddress;
 import java.net.SocketAddress;
 import java.util.List;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index 1c4d277..8b34f0c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,8 +66,8 @@ public class BeamFnControlClient {
   private final CompletableFuture<Void> onFinish;
 
   public BeamFnControlClient(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
       BiFunction<Function<StreamObserver<BeamFnApi.InstructionRequest>,
                           StreamObserver<BeamFnApi.InstructionResponse>>,
                  StreamObserver<BeamFnApi.InstructionRequest>,

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index e094487..c311c4c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -44,11 +44,11 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
 import org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
 import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
index 7be96b6..a3c2f5d 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -43,7 +44,7 @@ public interface BeamFnDataClient {
    * <p>The consumer is not required to be thread safe.
    */
   <T> CompletableFuture<Void> forInboundConsumer(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       KV<String, BeamFnApi.Target> inputLocation,
       Coder<WindowedValue<T>> coder,
       ThrowingConsumer<WindowedValue<T>> consumer);
@@ -58,7 +59,7 @@ public interface BeamFnDataClient {
    * <p>The returned closeable consumer is not thread safe.
    */
   <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       KV<String, BeamFnApi.Target> outputLocation,
       Coder<WindowedValue<T>> coder);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
index 8351626..f9aebdf 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -44,8 +45,8 @@ import org.slf4j.LoggerFactory;
 public class BeamFnDataGrpcClient implements BeamFnDataClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
 
-  private final ConcurrentMap<BeamFnApi.ApiServiceDescriptor, 
BeamFnDataGrpcMultiplexer> cache;
-  private final Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> 
channelFactory;
+  private final ConcurrentMap<Endpoints.ApiServiceDescriptor, 
BeamFnDataGrpcMultiplexer> cache;
+  private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> 
channelFactory;
   private final BiFunction<Function<StreamObserver<BeamFnApi.Elements>,
                                     StreamObserver<BeamFnApi.Elements>>,
                            StreamObserver<BeamFnApi.Elements>,
@@ -54,7 +55,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
 
   public BeamFnDataGrpcClient(
       PipelineOptions options,
-      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
       BiFunction<Function<StreamObserver<BeamFnApi.Elements>, 
StreamObserver<BeamFnApi.Elements>>,
                  StreamObserver<BeamFnApi.Elements>,
                  StreamObserver<BeamFnApi.Elements>> streamObserverFactory) {
@@ -74,7 +75,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
    */
   @Override
   public <T> CompletableFuture<Void> forInboundConsumer(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       KV<String, BeamFnApi.Target> inputLocation,
       Coder<WindowedValue<T>> coder,
       ThrowingConsumer<WindowedValue<T>> consumer) {
@@ -101,7 +102,7 @@ public class BeamFnDataGrpcClient implements 
BeamFnDataClient {
    */
   @Override
   public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       KV<String, BeamFnApi.Target> outputLocation,
       Coder<WindowedValue<T>> coder) {
     BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
@@ -114,9 +115,9 @@ public class BeamFnDataGrpcClient implements 
BeamFnDataClient {
   }
 
   private BeamFnDataGrpcMultiplexer getClientFor(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor) {
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
     return cache.computeIfAbsent(apiServiceDescriptor,
-        (BeamFnApi.ApiServiceDescriptor descriptor) -> new 
BeamFnDataGrpcMultiplexer(
+        (Endpoints.ApiServiceDescriptor descriptor) -> new 
BeamFnDataGrpcMultiplexer(
             descriptor,
             (StreamObserver<BeamFnApi.Elements> inboundObserver) -> 
streamObserverFactory.apply(
                 
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data,

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
index 8ee5491..a3c3986 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
@@ -27,26 +27,28 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A gRPC multiplexer for a specific {@link 
org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor}.
+ * A gRPC multiplexer for a specific {@link
+ * org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor}.
  *
- * <p>Multiplexes data for inbound consumers based upon their individual
- * {@link org.apache.beam.fn.v1.BeamFnApi.Target}s.
+ * <p>Multiplexes data for inbound consumers based upon their individual {@link
+ * org.apache.beam.fn.v1.BeamFnApi.Target}s.
  *
- * <p>Multiplexing inbound and outbound streams is as thread safe as the 
consumers of those
- * streams. For inbound streams, this is as thread safe as the inbound 
observers. For outbound
- * streams, this is as thread safe as the underlying stream observer.
+ * <p>Multiplexing inbound and outbound streams is as thread safe as the 
consumers of those streams.
+ * For inbound streams, this is as thread safe as the inbound observers. For 
outbound streams, this
+ * is as thread safe as the underlying stream observer.
  *
- * <p>TODO: Add support for multiplexing over multiple outbound observers by 
stickying
- * the output location with a specific outbound observer.
+ * <p>TODO: Add support for multiplexing over multiple outbound observers by 
stickying the output
+ * location with a specific outbound observer.
  */
 public class BeamFnDataGrpcMultiplexer {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final StreamObserver<BeamFnApi.Elements> inboundObserver;
   private final StreamObserver<BeamFnApi.Elements> outboundObserver;
   @VisibleForTesting
@@ -55,7 +57,7 @@ public class BeamFnDataGrpcMultiplexer {
       consumers;
 
   public BeamFnDataGrpcMultiplexer(
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       Function<StreamObserver<BeamFnApi.Elements>,
                StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) {
     this.apiServiceDescriptor = apiServiceDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index c9f5d80..a8f151c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -48,6 +48,7 @@ import java.util.logging.Logger;
 import java.util.logging.SimpleFormatter;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -94,7 +95,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
    * garbage collected. java.util.logging only has weak references to the 
loggers
    * so if they are garbage collected, our hierarchical configuration will be 
lost. */
   private final Collection<Logger> configuredLoggers;
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final ManagedChannel channel;
   private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
   private final LogControlObserver inboundObserver;
@@ -103,8 +104,8 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   public BeamFnLoggingClient(
       PipelineOptions options,
-      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
       BiFunction<Function<StreamObserver<BeamFnApi.LogControl>,
                           StreamObserver<BeamFnApi.LogEntry.List>>,
                  StreamObserver<BeamFnApi.LogControl>,

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
index 316e3e6..51a047a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
@@ -28,11 +28,11 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
 import org.apache.beam.fn.v1.BeamFnStateGrpc;
+import org.apache.beam.portability.v1.Endpoints;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class BeamFnStateGrpcClientCache {
   public BeamFnStateGrpcClientCache(
       PipelineOptions options,
       Supplier<String> idGenerator,
-      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
       BiFunction<Function<StreamObserver<StateResponse>, 
StreamObserver<StateRequest>>,
           StreamObserver<StateResponse>,
           StreamObserver<StateRequest>> streamObserverFactory) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index e5b4968..9b76fe1 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -77,7 +78,7 @@ import org.mockito.MockitoAnnotations;
 public class BeamFnDataReadRunnerTest {
 
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = 
BeamFnApi.RemoteGrpcPort.newBuilder()
-      
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
+      
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = 
RunnerApi.FunctionSpec.newBuilder()
       .setPayload(PORT_SPEC.toByteString()).build();
   private static final Coder<WindowedValue<String>> CODER =

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index c4b717a..8e9ebb8 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -49,6 +49,7 @@ import 
org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -72,7 +73,7 @@ import org.mockito.MockitoAnnotations;
 public class BeamFnDataWriteRunnerTest {
 
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = 
BeamFnApi.RemoteGrpcPort.newBuilder()
-      
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
+      
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = 
RunnerApi.FunctionSpec.newBuilder()
       .setPayload(PORT_SPEC.toByteString()).build();
   private static final String CODER_ID = "string-coder-id";

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index d92ba72..cdc4b01 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.fn.v1.BeamFnApi.InstructionResponse;
 import org.apache.beam.fn.v1.BeamFnApi.LogControl;
 import org.apache.beam.fn.v1.BeamFnControlGrpc;
 import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -106,14 +107,12 @@ public class FnHarnessTest {
       Server controlServer = 
ServerBuilder.forPort(0).addService(controlService).build();
       controlServer.start();
       try {
-        BeamFnApi.ApiServiceDescriptor loggingDescriptor = 
BeamFnApi.ApiServiceDescriptor
+        Endpoints.ApiServiceDescriptor loggingDescriptor = 
Endpoints.ApiServiceDescriptor
             .newBuilder()
-            .setId("1L")
             .setUrl("localhost:" + loggingServer.getPort())
             .build();
-        BeamFnApi.ApiServiceDescriptor controlDescriptor = 
BeamFnApi.ApiServiceDescriptor
+        Endpoints.ApiServiceDescriptor controlDescriptor = 
Endpoints.ApiServiceDescriptor
             .newBuilder()
-            .setId("2L")
             .setUrl("localhost:" + controlServer.getPort())
             .build();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
index 9f634c9..62bb1ba 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assume.assumeTrue;
 
 import io.grpc.ManagedChannel;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index edb7903..fedc7d4 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.test.TestStreams;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -91,8 +92,8 @@ public class BeamFnControlClientTest {
         TestStreams.withOnNext(values::add)
         .withOnCompleted(() -> clientClosedStream.set(true)).build();
 
-    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
-        BeamFnApi.ApiServiceDescriptor.newBuilder()
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
             .build();
     Server server = 
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
@@ -136,7 +137,7 @@ public class BeamFnControlClientTest {
 
       BeamFnControlClient client = new BeamFnControlClient(
                 apiServiceDescriptor,
-                (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+                (Endpoints.ApiServiceDescriptor descriptor) -> channel,
                 this::createStreamForTest,
                 handlers);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 94fa6ad..026348c 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -45,9 +45,9 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 31eb0db..2f3bc2c 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -48,6 +48,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.test.TestStreams;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -129,8 +130,8 @@ public class BeamFnDataGrpcClientTest {
     CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
         TestStreams.withOnNext(inboundServerValues::add).build();
 
-    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
-        BeamFnApi.ApiServiceDescriptor.newBuilder()
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
             .build();
     Server server = 
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
@@ -151,7 +152,7 @@ public class BeamFnDataGrpcClientTest {
 
     BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
         PipelineOptionsFactory.create(),
-        (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+        (Endpoints.ApiServiceDescriptor descriptor) -> channel,
         this::createStreamForTest);
 
     CompletableFuture<Void> readFutureA = clientFactory.forInboundConsumer(
@@ -197,8 +198,8 @@ public class BeamFnDataGrpcClientTest {
     CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
         TestStreams.withOnNext(inboundServerValues::add).build();
 
-    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
-        BeamFnApi.ApiServiceDescriptor.newBuilder()
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
             .build();
     Server server = 
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
@@ -220,7 +221,7 @@ public class BeamFnDataGrpcClientTest {
 
       BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
           PipelineOptionsFactory.create(),
-          (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel,
           this::createStreamForTest);
 
       CompletableFuture<Void> readFuture = clientFactory.forInboundConsumer(
@@ -271,8 +272,8 @@ public class BeamFnDataGrpcClientTest {
             }
             ).build();
 
-    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
-        BeamFnApi.ApiServiceDescriptor.newBuilder()
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
             .build();
     Server server = 
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
@@ -292,7 +293,7 @@ public class BeamFnDataGrpcClientTest {
       BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
           PipelineOptionsFactory.fromArgs(
               new String[]{ "--experiments=beam_fn_api_data_buffer_limit=20" 
}).create(),
-          (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel,
           this::createStreamForTest);
 
       try (CloseableThrowingConsumer<WindowedValue<String>> consumer =

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
index a9095ae..fdef03d 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
@@ -32,13 +32,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.fn.harness.test.TestStreams;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.values.KV;
 import org.junit.Test;
 
 /** Tests for {@link BeamFnDataGrpcMultiplexer}. */
 public class BeamFnDataGrpcMultiplexerTest {
-  private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR =
-      BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build();
+  private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
+      Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
   private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION =
       KV.of(
           "777L",

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index c2c26e7..c50695c 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -44,6 +44,7 @@ import java.util.logging.LogRecord;
 import org.apache.beam.fn.harness.test.TestStreams;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.portability.v1.Endpoints;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -108,8 +109,8 @@ public class BeamFnLoggingClientTest {
           }
         }).build();
 
-    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
-        BeamFnApi.ApiServiceDescriptor.newBuilder()
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
             .build();
     Server server = 
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
@@ -133,7 +134,7 @@ public class BeamFnLoggingClientTest {
               "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
           }).create(),
           apiServiceDescriptor,
-          (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel,
           this::createStreamForTest);
 
       // Ensure that log levels were correctly set.

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
index f0e84c7..a51e7b4 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
@@ -41,10 +41,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.IdGenerator;
 import org.apache.beam.fn.harness.test.TestStreams;
-import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
 import org.apache.beam.fn.v1.BeamFnStateGrpc;
+import org.apache.beam.portability.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -111,7 +111,7 @@ public class BeamFnStateGrpcClientCacheTest {
         clientCache.forApiServiceDescriptor(apiServiceDescriptor));
     assertNotSame(clientCache.forApiServiceDescriptor(apiServiceDescriptor),
         clientCache.forApiServiceDescriptor(
-            ApiServiceDescriptor.newBuilder().setId("OTHER").build()));
+            ApiServiceDescriptor.getDefaultInstance()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/portability/universal_local_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 65b66c6..579983c 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -31,9 +31,9 @@ from concurrent import futures
 import grpc
 from google.protobuf import text_format
 
-from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners import runner
 from apache_beam.runners.portability import fn_api_runner
 
@@ -337,7 +337,7 @@ class SubprocessSdkWorker(object):
 
   def run(self):
     control_descriptor = text_format.MessageToString(
-        beam_fn_api_pb2.ApiServiceDescriptor(url=self._control_address))
+        endpoints_pb2.ApiServiceDescriptor(url=self._control_address))
     p = subprocess.Popen(
         self._worker_command_line,
         shell=True,

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/worker/log_handler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index d2647d0..647b8b7 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -24,6 +24,7 @@ import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.worker import log_handler
 
 
@@ -50,7 +51,7 @@ class FnApiLogRecordHandlerTest(unittest.TestCase):
     self.test_port = self.server.add_insecure_port('[::]:0')
     self.server.start()
 
-    self.logging_service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor()
+    self.logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
     self.logging_service_descriptor.url = 'localhost:%s' % self.test_port
     self.fn_log_handler = log_handler.FnApiLogRecordHandler(
         self.logging_service_descriptor)

http://git-wip-us.apache.org/repos/asf/beam/blob/9bbed6d4/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 5dbf582..70e4c96 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -23,7 +23,7 @@ import sys
 
 from google.protobuf import text_format
 
-from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
 from apache_beam.runners.worker.sdk_worker import SdkHarness
 
@@ -33,7 +33,7 @@ from apache_beam.runners.worker.sdk_worker import SdkHarness
 def main(unused_argv):
   """Main entry point for SDK Fn Harness."""
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
-    logging_service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor()
+    logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
     text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'],
                       logging_service_descriptor)
 
@@ -47,7 +47,7 @@ def main(unused_argv):
 
   try:
     logging.info('Python sdk harness started.')
-    service_descriptor = beam_fn_api_pb2.ApiServiceDescriptor()
+    service_descriptor = endpoints_pb2.ApiServiceDescriptor()
     text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'],
                       service_descriptor)
     # TODO(robertwb): Support credentials.

Reply via email to