[ 
https://issues.apache.org/jira/browse/BEAM-3418?focusedWorklogId=87171&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87171
 ]

ASF GitHub Bot logged work on BEAM-3418:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Apr/18 17:25
            Start Date: 03/Apr/18 17:25
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4980: [BEAM-3418] Support 
multiple SDKHarness in RunnerHarness
URL: https://github.com/apache/beam/pull/4980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 228b593c1d8..135be3737bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -845,6 +845,12 @@
         <version>${grpc.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-context</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-core</artifactId>
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index dd82908a2f6..515801538f7 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -63,6 +63,11 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-context</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
new file mode 100644
index 00000000000..71089e33f5b
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.Context;
+import io.grpc.Contexts;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+/**
+ * A HeaderAccessorProvider which intercept the header in a GRPC request and 
expose the relevant
+ * fields.
+ */
+public class GrpcContextHeaderAccessorProvider {
+
+  private static final Key<String> WORKER_ID_KEY =
+      Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
+  private static final Context.Key<String> SDK_WORKER_CONTEXT_KEY = 
Context.key("worker_id");
+  private static final GrpcHeaderAccessor HEADER_ACCESSOR = new 
GrpcHeaderAccessor();
+  private static final ServerInterceptor INTERCEPTOR =
+      new ServerInterceptor() {
+        @Override
+        public <ReqT, RespT> Listener<ReqT> interceptCall(
+            ServerCall<ReqT, RespT> call,
+            Metadata requestHeaders,
+            ServerCallHandler<ReqT, RespT> next) {
+          String workerId = requestHeaders.get(WORKER_ID_KEY);
+          Context context = 
Context.current().withValue(SDK_WORKER_CONTEXT_KEY, workerId);
+          return Contexts.interceptCall(context, call, requestHeaders, next);
+        }
+      };
+
+  public static ServerInterceptor interceptor() {
+    return INTERCEPTOR;
+  }
+
+  public static HeaderAccessor getHeaderAccessor() {
+    return HEADER_ACCESSOR;
+  }
+
+  private static class GrpcHeaderAccessor implements HeaderAccessor {
+
+    @Override
+    /** This method should be called from the request method. */
+    public String getSdkWorkerId() {
+      return SDK_WORKER_CONTEXT_KEY.get();
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/HeaderAccessor.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/HeaderAccessor.java
new file mode 100644
index 00000000000..cde9044434b
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/HeaderAccessor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+/** Interface to access headers in the client request. */
+public interface HeaderAccessor {
+  /** This method should be called from the request method. */
+  String getSdkWorkerId();
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 535de68922d..52ce09af4e6 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -20,6 +20,7 @@
 
 import io.grpc.BindableService;
 import io.grpc.Server;
+import io.grpc.ServerInterceptors;
 import io.grpc.inprocess.InProcessServerBuilder;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +51,8 @@ public Server allocatePortAndCreate(BindableService service, 
ApiServiceDescripto
   public Server create(BindableService service, ApiServiceDescriptor 
serviceDescriptor)
       throws IOException {
     return InProcessServerBuilder.forName(serviceDescriptor.getUrl())
-        .addService(service)
+        .addService(
+            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()))
         .build()
         .start();
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index bb45d082dad..6db91ad9732 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -23,6 +23,7 @@
 import com.google.common.net.HostAndPort;
 import io.grpc.BindableService;
 import io.grpc.Server;
+import io.grpc.ServerInterceptors;
 import io.grpc.netty.NettyServerBuilder;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -31,13 +32,9 @@
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 
-/**
- * A {@link Server gRPC server} factory.
- */
+/** A {@link Server gRPC server} factory. */
 public abstract class ServerFactory {
-  /**
-   * Create a default {@link ServerFactory}.
-   */
+  /** Create a default {@link ServerFactory}. */
   public static ServerFactory createDefault() {
     return new InetSocketAddressServerFactory();
   }
@@ -45,13 +42,16 @@ public static ServerFactory createDefault() {
   /**
    * Creates an instance of this server using an ephemeral port chosen 
automatically. The chosen
    * port is accessible to the caller from the URL set in the input {@link
-   * Endpoints.ApiServiceDescriptor.Builder}.
+   * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
+   * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
    */
   public abstract Server allocatePortAndCreate(
       BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) 
throws IOException;
 
   /**
    * Creates an instance of this server at the address specified by the given 
service descriptor.
+   * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to 
all incoming
+   * requests.
    */
   public abstract Server create(
       BindableService service, Endpoints.ApiServiceDescriptor 
serviceDescriptor) throws IOException;
@@ -82,16 +82,20 @@ public Server create(BindableService service, 
Endpoints.ApiServiceDescriptor ser
       checkArgument(
           socketAddress instanceof InetSocketAddress,
           "%s %s requires a host:port socket address, got %s",
-          getClass().getSimpleName(), ServerFactory.class.getSimpleName(),
+          getClass().getSimpleName(),
+          ServerFactory.class.getSimpleName(),
           serviceDescriptor.getUrl());
       return createServer(service, (InetSocketAddress) socketAddress);
     }
 
     private static Server createServer(BindableService service, 
InetSocketAddress socket)
         throws IOException {
+      // Note: Every ServerFactory should apply 
GrpcContextHeaderAccessorProvider to the service.
       Server server =
           NettyServerBuilder.forPort(socket.getPort())
-              .addService(service)
+              .addService(
+                  ServerInterceptors.intercept(
+                      service, 
GrpcContextHeaderAccessorProvider.interceptor()))
               // Set the message size to max value here. The actual size is 
governed by the
               // buffer size in the layers above.
               .maxMessageSize(Integer.MAX_VALUE)
@@ -99,6 +103,5 @@ private static Server createServer(BindableService service, 
InetSocketAddress so
       server.start();
       return server;
     }
-
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
index 81eb7281b20..06450f9efec 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
@@ -22,12 +22,15 @@
 import io.grpc.stub.StreamObserver;
 import java.io.Closeable;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,23 +54,25 @@
   private final ResponseStreamObserver responseObserver = new 
ResponseStreamObserver();
   private final ConcurrentMap<String, 
CompletableFuture<BeamFnApi.InstructionResponse>>
       outstandingRequests;
+  private final Set<Consumer<FnApiControlClient>> onCloseListeners = 
ConcurrentHashMap.newKeySet();
+  private final String workerId;
   private AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> 
requestReceiver) {
+  private FnApiControlClient(String workerId, 
StreamObserver<InstructionRequest> requestReceiver) {
+    this.workerId = workerId;
     this.requestReceiver = 
SynchronizedStreamObserver.wrapping(requestReceiver);
     this.outstandingRequests = new ConcurrentHashMap<>();
   }
 
   /**
-   * Returns a {@link FnApiControlClient} which will submit its requests to 
the provided
-   * observer.
+   * Returns a {@link FnApiControlClient} which will submit its requests to 
the provided observer.
    *
    * <p>It is the responsibility of the caller to register this object as an 
observer of incoming
    * responses (this will generally be done as part of fulfilling the contract 
of a gRPC service).
    */
   public static FnApiControlClient forRequestObserver(
-      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    return new FnApiControlClient(requestObserver);
+      String workerId, StreamObserver<BeamFnApi.InstructionRequest> 
requestObserver) {
+    return new FnApiControlClient(workerId, requestObserver);
   }
 
   public CompletionStage<BeamFnApi.InstructionResponse> handle(
@@ -88,34 +93,48 @@ public void close() {
     closeAndTerminateOutstandingRequests(new IllegalStateException("Runner 
closed connection"));
   }
 
+  public String getWorkerId() {
+    return workerId;
+  }
+
   /** Closes this client and terminates any outstanding requests 
exceptionally. */
   private void closeAndTerminateOutstandingRequests(Throwable cause) {
     if (isClosed.getAndSet(true)) {
       return;
     }
 
-    // Make a copy of the map to make the view of the outstanding requests 
consistent.
-    Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
-        new ConcurrentHashMap<>(outstandingRequests);
-    outstandingRequests.clear();
+    try {
+      // Make a copy of the map to make the view of the outstanding requests 
consistent.
+      Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
+          new ConcurrentHashMap<>(outstandingRequests);
+      outstandingRequests.clear();
 
-    if (outstandingRequestsCopy.isEmpty()) {
-      requestReceiver.onCompleted();
-      return;
-    }
-    requestReceiver.onError(
-        new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
-
-    LOG.error(
-        "{} closed, clearing outstanding requests {}",
-        FnApiControlClient.class.getSimpleName(),
-        outstandingRequestsCopy);
-    for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
-        outstandingRequestsCopy.values()) {
-      outstandingRequest.completeExceptionally(cause);
+      if (outstandingRequestsCopy.isEmpty()) {
+        requestReceiver.onCompleted();
+        return;
+      }
+      requestReceiver.onError(
+          new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
+
+      LOG.error(
+          "{} closed, clearing outstanding requests {}",
+          FnApiControlClient.class.getSimpleName(),
+          outstandingRequestsCopy);
+      for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest 
:
+          outstandingRequestsCopy.values()) {
+        outstandingRequest.completeExceptionally(cause);
+      }
+    } finally {
+      for (Consumer<FnApiControlClient> onCloseListener : onCloseListeners) {
+        onCloseListener.accept(this);
+      }
     }
   }
 
+  public void onClose(Consumer<FnApiControlClient> onCloseListener) {
+    onCloseListeners.add(onCloseListener);
+  }
+
   /**
    * A private view of this class as a {@link StreamObserver} for connecting 
as a gRPC listener.
    */
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index 5a82a32a0b4..aa4fe5c058b 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -25,6 +25,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.HeaderAccessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,10 +36,13 @@
 
   private final BlockingQueue<FnApiControlClient> clientPool;
   private final Collection<FnApiControlClient> vendedClients = new 
CopyOnWriteArrayList<>();
+  private final HeaderAccessor headerAccessor;
   private AtomicBoolean closed = new AtomicBoolean();
 
-  private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> 
clientPool) {
+  private FnApiControlClientPoolService(
+      BlockingQueue<FnApiControlClient> clientPool, HeaderAccessor 
headerAccessor) {
     this.clientPool = clientPool;
+    this.headerAccessor = headerAccessor;
   }
 
   /**
@@ -49,8 +53,8 @@ private 
FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPo
    * That consumer is responsible for closing the clients when they are no 
longer needed.
    */
   public static FnApiControlClientPoolService offeringClientsToPool(
-      BlockingQueue<FnApiControlClient> clientPool) {
-    return new FnApiControlClientPoolService(clientPool);
+      BlockingQueue<FnApiControlClient> clientPool, HeaderAccessor 
headerAccessor) {
+    return new FnApiControlClientPoolService(clientPool, headerAccessor);
   }
 
   /**
@@ -63,8 +67,9 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
   @Override
   public StreamObserver<BeamFnApi.InstructionResponse> control(
       StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    LOGGER.info("Beam Fn Control client connected.");
-    FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(requestObserver);
+    LOGGER.info("Beam Fn Control client connected with id {}", 
headerAccessor.getSdkWorkerId());
+    FnApiControlClient newClient =
+        FnApiControlClient.forRequestObserver(headerAccessor.getSdkWorkerId(), 
requestObserver);
     try {
       // Add the client to the pool of vended clients before making it 
available - we should close
       // the client when we close even if no one has picked it up yet. This 
can occur after the
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
index 20ad69a9759..dd7023651c5 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.SynchronousQueue;
 import java.util.function.Supplier;
 import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.HeaderAccessor;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 
 /**
@@ -39,15 +40,18 @@
 
   private final Collection<SdkHarnessClient> activeClients;
 
-  public static SdkHarnessClientControlService create(Supplier<FnDataService> 
dataService) {
-    return new SdkHarnessClientControlService(dataService);
+  public static SdkHarnessClientControlService create(
+      Supplier<FnDataService> dataService, HeaderAccessor headerAccessor) {
+    return new SdkHarnessClientControlService(dataService, headerAccessor);
   }
 
-  private SdkHarnessClientControlService(Supplier<FnDataService> dataService) {
+  private SdkHarnessClientControlService(
+      Supplier<FnDataService> dataService, HeaderAccessor headerAccessor) {
     this.dataService = dataService;
     activeClients = new ConcurrentLinkedQueue<>();
     pendingClients = new SynchronousQueue<>();
-    clientPoolService = 
FnApiControlClientPoolService.offeringClientsToPool(pendingClients);
+    clientPoolService =
+        FnApiControlClientPoolService.offeringClientsToPool(pendingClients, 
headerAccessor);
   }
 
   public SdkHarnessClient getClient() {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
new file mode 100644
index 00000000000..a5851d60ef0
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests for {@link GrpcContextHeaderAccessorProvider}. */
+@RunWith(JUnit4.class)
+public class GrpcContextHeaderAccessorProviderTest {
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWorkerIdOnConnect() throws Exception {
+    final String worker1 = "worker1";
+    CompletableFuture<String> workerId = new CompletableFuture<>();
+    Consumer<StreamObserver<Elements>> consumer =
+        elementsStreamObserver ->
+            workerId.complete(
+                
GrpcContextHeaderAccessorProvider.getHeaderAccessor().getSdkWorkerId());
+    TestDataService testService = new 
TestDataService(Mockito.mock(StreamObserver.class), consumer);
+    ApiServiceDescriptor serviceDescriptor =
+        ApiServiceDescriptor.newBuilder().setUrl("testServer").build();
+    Server server = InProcessServerFactory.create().create(testService, 
serviceDescriptor);
+    final Metadata.Key<String> workerIdKey =
+        Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
+    Channel channel =
+        InProcessChannelBuilder.forName(serviceDescriptor.getUrl())
+            .intercept(
+                new ClientInterceptor() {
+                  @Override
+                  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+                      MethodDescriptor<ReqT, RespT> method, CallOptions 
callOptions, Channel next) {
+                    ClientCall<ReqT, RespT> call = next.newCall(method, 
callOptions);
+                    return new SimpleForwardingClientCall<ReqT, RespT>(call) {
+                      @Override
+                      public void start(
+                          ClientCall.Listener<RespT> responseListener, 
Metadata headers) {
+                        headers.put(workerIdKey, worker1);
+                        super.start(responseListener, headers);
+                      }
+                    };
+                  }
+                })
+            .build();
+    BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
+    stub.data(Mockito.mock(StreamObserver.class));
+    server.shutdown();
+
+    Assert.assertEquals(worker1, workerId.get());
+  }
+
+  /** A test gRPC service that uses the provided inbound observer for all 
clients. */
+  private static class TestDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase {
+    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+    private final Consumer<StreamObserver<Elements>> consumer;
+
+    private TestDataService(
+        StreamObserver<BeamFnApi.Elements> inboundObserver,
+        Consumer<StreamObserver<BeamFnApi.Elements>> consumer) {
+      this.inboundObserver = inboundObserver;
+      this.consumer = consumer;
+    }
+
+    @Override
+    public StreamObserver<BeamFnApi.Elements> data(
+        StreamObserver<BeamFnApi.Elements> outboundObserver) {
+      consumer.accept(outboundObserver);
+      return inboundObserver;
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
index 530b919187a..137a0283c89 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
@@ -72,7 +72,8 @@ protected void before() throws IOException, 
InterruptedException {
     executor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).build());
     SynchronousQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
     FnApiControlClientPoolService clientPoolService =
-        FnApiControlClientPoolService.offeringClientsToPool(clientPool);
+        FnApiControlClientPoolService.offeringClientsToPool(
+            clientPool, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
 
     loggingServer =
         GrpcFnServer.allocatePortAndCreateFor(
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
index 98f258fcb41..d08bcfc1edf 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -34,6 +34,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
 import org.apache.beam.sdk.util.MoreFutures;
@@ -52,7 +53,8 @@
   // for matching incoming connections and server threads is likely.
   private final BlockingQueue<FnApiControlClient> pool = new 
LinkedBlockingQueue<>();
   private final FnApiControlClientPoolService controlService =
-      FnApiControlClientPoolService.offeringClientsToPool(pool);
+      FnApiControlClientPoolService.offeringClientsToPool(
+          pool, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
   private GrpcFnServer<FnApiControlClientPoolService> server;
   private BeamFnControlGrpc.BeamFnControlStub stub;
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
index e26e426e605..fec1cb56dd8 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
@@ -27,6 +27,7 @@
 import io.grpc.stub.StreamObserver;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -38,6 +39,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 /** Unit tests for {@link FnApiControlClient}. */
@@ -52,7 +54,7 @@
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    client = FnApiControlClient.forRequestObserver(mockObserver);
+    client = FnApiControlClient.forRequestObserver("DUMMY", mockObserver);
   }
 
   @Test
@@ -158,4 +160,19 @@ public void testCloseCancelsOutstanding() throws Exception 
{
     thrown.expectMessage("closed");
     MoreFutures.get(responseFuture);
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOnCloseMultipleListener() throws Exception {
+    Consumer<FnApiControlClient> mockConsumer1 = Mockito.mock(Consumer.class);
+    Consumer<FnApiControlClient> mockConsumer2 = Mockito.mock(Consumer.class);
+
+    client.onClose(mockConsumer1);
+    client.onClose(mockConsumer2);
+
+    client.close();
+
+    verify(mockConsumer1).accept(client);
+    verify(mockConsumer2).accept(client);
+  }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 962cb790b27..c9ec7a4f050 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -47,6 +47,7 @@
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.FusedPipeline;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
 import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
@@ -110,7 +111,9 @@ public void setup() throws Exception {
     BlockingQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
     controlServer =
         GrpcFnServer.allocatePortAndCreateFor(
-            FnApiControlClientPoolService.offeringClientsToPool(clientPool), 
serverFactory);
+            FnApiControlClientPoolService.offeringClientsToPool(
+                clientPool, 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+            serverFactory);
 
     // Create the SDK harness, and wait until it connects
     sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 87171)
    Time Spent: 6h 50m  (was: 6h 40m)

> Python Fnapi - Support Multiple SDK workers on a single VM
> ----------------------------------------------------------
>
>                 Key: BEAM-3418
>                 URL: https://issues.apache.org/jira/browse/BEAM-3418
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>              Labels: performance, portability
>          Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Support multiple python SDK process on a VM to fully utilize a machine.
> Each SDK Process will work in isolation and interact with Runner HarnessĀ 
> independently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to