[ 
https://issues.apache.org/jira/browse/BEAM-4267?focusedWorklogId=104770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104770
 ]

ASF GitHub Bot logged work on BEAM-4267:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/18 19:54
            Start Date: 22/May/18 19:54
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5392: [BEAM-4267] 
JobBundleFactory that uses Docker-backed environments
URL: https://github.com/apache/beam/pull/5392
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/java-fn-execution/build.gradle 
b/runners/java-fn-execution/build.gradle
index b7c4a439526..845c6b28ed2 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -22,6 +22,7 @@ description = "Apache Beam :: Runners :: Java Fn Execution"
 
 dependencies {
   compile library.java.guava
+  compile library.java.findbugs_annotations
   compile project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 6db91ad9732..6ebedeccd8c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -36,7 +36,12 @@
 public abstract class ServerFactory {
   /** Create a default {@link ServerFactory}. */
   public static ServerFactory createDefault() {
-    return new InetSocketAddressServerFactory();
+    return new InetSocketAddressServerFactory(UrlFactory.createDefault());
+  }
+
+  /** Create a {@link ServerFactory} that uses the given url factory. */
+  public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
+    return new InetSocketAddressServerFactory(urlFactory);
   }
 
   /**
@@ -62,7 +67,11 @@ public abstract Server create(
    * <p>The server is created listening any open port on "localhost".
    */
   public static class InetSocketAddressServerFactory extends ServerFactory {
-    private InetSocketAddressServerFactory() {}
+    private final UrlFactory urlFactory;
+
+    private InetSocketAddressServerFactory(UrlFactory urlFactory) {
+      this.urlFactory = urlFactory;
+    }
 
     @Override
     public Server allocatePortAndCreate(
@@ -70,8 +79,7 @@ public Server allocatePortAndCreate(
         throws IOException {
       InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
       Server server = createServer(service, address);
-      apiServiceDescriptor.setUrl(
-          HostAndPort.fromParts(address.getHostName(), 
server.getPort()).toString());
+      apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), 
server.getPort()));
       return server;
     }
 
@@ -104,4 +112,17 @@ private static Server createServer(BindableService 
service, InetSocketAddress so
       return server;
     }
   }
+
+  /**
+   * Factory that constructs client-accessible URLs from a local server 
address and port. Necessary
+   * when clients access server from a different networking context.
+   */
+  @FunctionalInterface
+  public interface UrlFactory {
+    String createUrl(String address, int port);
+
+    static UrlFactory createDefault() {
+      return (host, port) -> HostAndPort.fromParts(host, port).toString();
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
new file mode 100644
index 00000000000..dff269cba9d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link JobBundleFactory} that uses a {@link DockerEnvironmentFactory} for 
environment
+ * management. Note that returned {@link StageBundleFactory stage bundle 
factories} are not
+ * thread-safe. Instead, a new stage factory should be created for each client.
+ */
+@ThreadSafe
+public class DockerJobBundleFactory implements JobBundleFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DockerJobBundleFactory.class);
+
+  // TODO: This host name seems to change with every other Docker release. Do 
we attempt to keep up
+  // or attempt to document the supported Docker version(s)?
+  private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
+
+  private final IdGenerator stageIdGenerator;
+  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+
+  private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
+
+  public static DockerJobBundleFactory create(ArtifactSource artifactSource) 
throws Exception {
+    ServerFactory serverFactory = getServerFactory();
+    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+    ControlClientPool clientPool = MapControlClientPool.create();
+
+    GrpcFnServer<FnApiControlClientPoolService> controlServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            FnApiControlClientPoolService.offeringClientsToPool(
+                clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+            serverFactory);
+    GrpcFnServer<GrpcLoggingService> loggingServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+    // TODO: Wire in artifact retrieval service once implemented.
+    GrpcFnServer<ArtifactRetrievalService> retrievalServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            new UnimplementedArtifactRetrievalService(), serverFactory);
+    GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            
StaticGrpcProvisionService.create(ProvisionInfo.newBuilder().build()), 
serverFactory);
+    DockerEnvironmentFactory environmentFactory =
+        DockerEnvironmentFactory.forServices(
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer,
+            clientPool.getSource(),
+            IdGenerators.incrementingLongs());
+    return new DockerJobBundleFactory(
+        environmentFactory,
+        serverFactory,
+        stageIdGenerator,
+        controlServer,
+        loggingServer,
+        retrievalServer,
+        provisioningServer);
+  }
+
+  @VisibleForTesting
+  DockerJobBundleFactory(
+      DockerEnvironmentFactory environmentFactory,
+      ServerFactory serverFactory,
+      IdGenerator stageIdGenerator,
+      GrpcFnServer<FnApiControlClientPoolService> controlServer,
+      GrpcFnServer<GrpcLoggingService> loggingServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
+    this.stageIdGenerator = stageIdGenerator;
+    this.controlServer = controlServer;
+    this.loggingServer = loggingServer;
+    this.retrievalServer = retrievalServer;
+    this.provisioningServer = provisioningServer;
+    this.environmentCache =
+        CacheBuilder.newBuilder()
+            .weakValues()
+            .removalListener(
+                ((RemovalNotification<Environment, WrappedSdkHarnessClient> 
notification) -> {
+                  LOG.debug("Cleaning up for environment {}", 
notification.getKey().getUrl());
+                  try {
+                    notification.getValue().close();
+                  } catch (Exception e) {
+                    LOG.warn(
+                        String.format("Error cleaning up environment %s", 
notification.getKey()),
+                        e);
+                  }
+                }))
+            .build(
+                new CacheLoader<Environment, WrappedSdkHarnessClient>() {
+                  @Override
+                  public WrappedSdkHarnessClient load(Environment environment) 
throws Exception {
+                    RemoteEnvironment remoteEnvironment =
+                        environmentFactory.createEnvironment(environment);
+                    return WrappedSdkHarnessClient.wrapping(remoteEnvironment, 
serverFactory);
+                  }
+                });
+  }
+
+  @Override
+  public <T> StageBundleFactory<T> forStage(ExecutableStage executableStage) {
+    WrappedSdkHarnessClient wrappedClient =
+        environmentCache.getUnchecked(executableStage.getEnvironment());
+    ExecutableProcessBundleDescriptor processBundleDescriptor;
+    try {
+      processBundleDescriptor =
+          ProcessBundleDescriptors.fromExecutableStage(
+              stageIdGenerator.getId(),
+              executableStage,
+              wrappedClient.getDataServer().getApiServiceDescriptor());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return SimpleStageBundleFactory.create(wrappedClient, 
processBundleDescriptor);
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Clear the cache. This closes all active environments.
+    environmentCache.invalidateAll();
+    environmentCache.cleanUp();
+
+    // Tear down common servers.
+    controlServer.close();
+    loggingServer.close();
+    retrievalServer.close();
+    provisioningServer.close();
+  }
+
+  private static ServerFactory getServerFactory() {
+    switch (getPlatform()) {
+      case LINUX:
+        return ServerFactory.createDefault();
+      case MAC:
+        // NOTE: Deployment on Macs is intended for local development. As of 
18.03, Docker-for-Mac
+        // does not implement host networking (--networking=host is 
effectively a no-op). Instead,
+        // we use a special DNS entry that points to the host:
+        // 
https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds
+        // The special hostname has historically changed between versions, so 
this is subject to
+        // breakages and will likely only support the latest version at any 
time.
+        return ServerFactory.createWithUrlFactory(
+            (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, 
port).toString());
+      default:
+        LOG.warn("Unknown Docker platform. Falling back to default server 
factory");
+        return ServerFactory.createDefault();
+    }
+  }
+
+  private static Platform getPlatform() {
+    String osName = System.getProperty("os.name").toLowerCase();
+    // TODO: Make this more robust?
+    if (osName.startsWith("mac")) {
+      return Platform.MAC;
+    } else if (osName.startsWith("linux")) {
+      return Platform.LINUX;
+    }
+    return Platform.OTHER;
+  }
+
+  private static class SimpleStageBundleFactory<InputT> implements 
StageBundleFactory<InputT> {
+
+    private final BundleProcessor<InputT> processor;
+    private final ExecutableProcessBundleDescriptor processBundleDescriptor;
+
+    // Store the wrapped client in order to keep a live reference into the 
cache.
+    @SuppressFBWarnings private WrappedSdkHarnessClient wrappedClient;
+
+    static <InputT> SimpleStageBundleFactory<InputT> create(
+        WrappedSdkHarnessClient wrappedClient,
+        ExecutableProcessBundleDescriptor processBundleDescriptor) {
+      @SuppressWarnings("unchecked")
+      BundleProcessor<InputT> processor =
+          wrappedClient
+              .getClient()
+              .getProcessor(
+                  processBundleDescriptor.getProcessBundleDescriptor(),
+                  (RemoteInputDestination) 
processBundleDescriptor.getRemoteInputDestination(),
+                  wrappedClient.getStateServer().getService());
+      return new SimpleStageBundleFactory<>(processBundleDescriptor, 
processor, wrappedClient);
+    }
+
+    SimpleStageBundleFactory(
+        ExecutableProcessBundleDescriptor processBundleDescriptor,
+        BundleProcessor<InputT> processor,
+        WrappedSdkHarnessClient wrappedClient) {
+      this.processBundleDescriptor = processBundleDescriptor;
+      this.processor = processor;
+      this.wrappedClient = wrappedClient;
+    }
+
+    @Override
+    public RemoteBundle<InputT> getBundle(
+        OutputReceiverFactory outputReceiverFactory, StateRequestHandler 
stateRequestHandler)
+        throws Exception {
+      // TODO: Consider having BundleProcessor#newBundle take in an 
OutputReceiverFactory rather
+      // than constructing the receiver map here. Every bundle factory will 
need this.
+      ImmutableMap.Builder<Target, RemoteOutputReceiver<?>> outputReceivers =
+          ImmutableMap.builder();
+      for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoder :
+          processBundleDescriptor.getOutputTargetCoders().entrySet()) {
+        Target target = targetCoder.getKey();
+        Coder<WindowedValue<?>> coder = targetCoder.getValue();
+        String bundleOutputPCollection =
+            Iterables.getOnlyElement(
+                processBundleDescriptor
+                    .getProcessBundleDescriptor()
+                    
.getTransformsOrThrow(target.getPrimitiveTransformReference())
+                    .getInputsMap()
+                    .values());
+        FnDataReceiver<WindowedValue<?>> outputReceiver =
+            outputReceiverFactory.create(bundleOutputPCollection);
+        outputReceivers.put(target, RemoteOutputReceiver.of(coder, 
outputReceiver));
+      }
+      return processor.newBundle(outputReceivers.build(), stateRequestHandler);
+    }
+
+    @Override
+    public void close() throws Exception {
+      // Clear reference to encourage cache eviction. Values are weakly 
referenced.
+      wrappedClient = null;
+    }
+  }
+
+  /**
+   * Holder for an {@link SdkHarnessClient} along with its associated state 
and data servers. As of
+   * now, there is a 1:1 relationship between data services and harness 
clients. The servers are
+   * packaged here to tie server lifetimes to harness client lifetimes.
+   */
+  private static class WrappedSdkHarnessClient implements AutoCloseable {
+    private final RemoteEnvironment environment;
+    private final ExecutorService executor;
+    // TODO: How should data server lifetime be scoped? It is necessary here 
for now because
+    // SdkHarnessClient requires one at construction.
+    private final GrpcFnServer<GrpcDataService> dataServer;
+    private final GrpcFnServer<GrpcStateService> stateServer;
+    private final SdkHarnessClient client;
+
+    static WrappedSdkHarnessClient wrapping(
+        RemoteEnvironment environment, ServerFactory serverFactory) throws 
Exception {
+      ExecutorService executor = Executors.newCachedThreadPool();
+      GrpcFnServer<GrpcDataService> dataServer =
+          
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
+      GrpcFnServer<GrpcStateService> stateServer =
+          GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory);
+      SdkHarnessClient client =
+          SdkHarnessClient.usingFnApiClient(
+              environment.getInstructionRequestHandler(), 
dataServer.getService());
+      return new WrappedSdkHarnessClient(environment, executor, dataServer, 
stateServer, client);
+    }
+
+    private WrappedSdkHarnessClient(
+        RemoteEnvironment environment,
+        ExecutorService executor,
+        GrpcFnServer<GrpcDataService> dataServer,
+        GrpcFnServer<GrpcStateService> stateServer,
+        SdkHarnessClient client) {
+      this.executor = executor;
+      this.environment = environment;
+      this.dataServer = dataServer;
+      this.stateServer = stateServer;
+      this.client = client;
+    }
+
+    SdkHarnessClient getClient() {
+      return client;
+    }
+
+    GrpcFnServer<GrpcStateService> getStateServer() {
+      return stateServer;
+    }
+
+    GrpcFnServer<GrpcDataService> getDataServer() {
+      return dataServer;
+    }
+
+    @Override
+    public void close() throws Exception {
+      try (AutoCloseable stateServerCloser = stateServer;
+          AutoCloseable dataServerCloser = dataServer;
+          AutoCloseable envCloser = environment;
+          AutoCloseable executorCloser = executor::shutdown) {
+        // Wrap resources in try-with-resources to ensure all are cleaned up.
+      }
+      // TODO: Wait for executor shutdown?
+    }
+  }
+
+  private enum Platform {
+    MAC,
+    LINUX,
+    OTHER,
+  }
+
+  // TODO: Remove this once a real artifact retrieval service has been wired 
in.
+  private static class UnimplementedArtifactRetrievalService
+      extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
+      implements ArtifactRetrievalService {
+
+    @Override
+    public void close() throws Exception {}
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index d9fa6e1d334..1ea8ee95884 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -190,7 +190,7 @@ public static ExecutableProcessBundleDescriptor of(
      * Get the {@link RemoteInputDestination} that input data are sent to the 
{@link
      * ProcessBundleDescriptor} over.
      */
-    public abstract RemoteInputDestination<? super WindowedValue<?>> 
getRemoteInputDestination();
+    public abstract RemoteInputDestination<WindowedValue<?>> 
getRemoteInputDestination();
 
     /**
      * Get all of the targets materialized by this {@link 
ExecutableProcessBundleDescriptor} and the
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index 0eb5d6fc54b..a6b4bc86e52 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -32,7 +32,6 @@
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,14 +46,15 @@
 
   /**
    * Returns a {@link DockerEnvironmentFactory} for the provided {@link 
GrpcFnServer servers} using
-   * the default {@link DockerCommand} and {@link IdGenerators}.
+   * the default {@link DockerCommand}.
    */
   public static DockerEnvironmentFactory forServices(
       GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
       GrpcFnServer<GrpcLoggingService> loggingServiceServer,
       GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
       GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource) {
+      ControlClientPool.Source clientSource,
+      IdGenerator idGenerator) {
     return forServicesWithDocker(
         DockerCommand.getDefault(),
         controlServiceServer,
@@ -62,7 +62,7 @@ public static DockerEnvironmentFactory forServices(
         retrievalServiceServer,
         provisioningServiceServer,
         clientSource,
-        IdGenerators.incrementingLongs());
+        idGenerator);
   }
 
   static DockerEnvironmentFactory forServicesWithDocker(
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index 6ba9da5ffb9..8cb4cb6b682 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -23,6 +23,7 @@
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -42,6 +43,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.sdk.fn.test.TestStreams;
 import org.junit.Test;
@@ -59,7 +61,7 @@
       .build();
 
   @Test
-  public void testCreatingDefaultServer() throws Exception {
+  public void defaultServerWorks() throws Exception {
     Endpoints.ApiServiceDescriptor apiServiceDescriptor =
         runTestUsing(ServerFactory.createDefault(), 
ManagedChannelFactory.createDefault());
     HostAndPort hostAndPort = 
HostAndPort.fromString(apiServiceDescriptor.getUrl());
@@ -69,6 +71,19 @@ public void testCreatingDefaultServer() throws Exception {
     assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
   }
 
+  @Test
+  public void usesUrlFactory() throws Exception {
+    ServerFactory serverFactory = ServerFactory.createWithUrlFactory((host, 
port) -> "foo");
+    CallStreamObserver<Elements> observer =
+        TestStreams.withOnNext((Elements unused) -> {}).withOnCompleted(() -> 
{}).build();
+    TestDataService service = new TestDataService(observer);
+    ApiServiceDescriptor.Builder descriptorBuilder = 
ApiServiceDescriptor.newBuilder();
+    Server server = serverFactory.allocatePortAndCreate(service, 
descriptorBuilder);
+    // Immediately terminate server. We don't actually use it here.
+    server.shutdown();
+    assertThat(descriptorBuilder.getUrl(), is("foo"));
+  }
+
   private Endpoints.ApiServiceDescriptor runTestUsing(
       ServerFactory serverFactory, ManagedChannelFactory channelFactory) 
throws Exception {
     Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
new file mode 100644
index 00000000000..dbf3deaa363
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DockerJobBundleFactory}. */
+@RunWith(JUnit4.class)
+public class DockerJobBundleFactoryTest {
+
+  @Mock private DockerEnvironmentFactory envFactory;
+  @Mock private RemoteEnvironment remoteEnvironment;
+  @Mock private InstructionRequestHandler instructionHandler;
+  @Mock private ServerFactory serverFactory;
+  @Mock GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  @Mock GrpcFnServer<GrpcLoggingService> loggingServer;
+  @Mock GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  @Mock GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+
+  private final Environment environment = 
Environment.newBuilder().setUrl("env-url").build();
+  private final IdGenerator stageIdGenerator = 
IdGenerators.incrementingLongs();
+  private final InstructionResponse instructionResponse =
+      
InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
+
+  @Before
+  public void setUpMocks() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    
when(envFactory.createEnvironment(environment)).thenReturn(remoteEnvironment);
+    
when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionHandler);
+    when(instructionHandler.handle(any()))
+        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+  }
+
+  @Test
+  public void createsCorrectEnvironment() throws Exception {
+    try (DockerJobBundleFactory bundleFactory =
+        new DockerJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      bundleFactory.forStage(getExecutableStage(environment));
+      verify(envFactory).createEnvironment(environment);
+    }
+  }
+
+  @Test
+  public void closesEnvironmentOnCleanup() throws Exception {
+    DockerJobBundleFactory bundleFactory =
+        new DockerJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer);
+    try (AutoCloseable unused = bundleFactory) {
+      bundleFactory.forStage(getExecutableStage(environment));
+    }
+    verify(remoteEnvironment).close();
+  }
+
+  @Test
+  public void cachesEnvironment() throws Exception {
+    try (DockerJobBundleFactory bundleFactory =
+        new DockerJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      StageBundleFactory<?> bf1 = 
bundleFactory.forStage(getExecutableStage(environment));
+      StageBundleFactory<?> bf2 = 
bundleFactory.forStage(getExecutableStage(environment));
+      // NOTE: We hang on to stage bundle references to ensure their 
underlying environments are not
+      // garbage collected. For additional safety, we print the factories to 
ensure the referernces
+      // are not optimized away.
+      System.out.println("bundle factory 1:" + bf1);
+      System.out.println("bundle factory 1:" + bf2);
+      verify(envFactory).createEnvironment(environment);
+      verifyNoMoreInteractions(envFactory);
+    }
+  }
+
+  @Test
+  public void doesNotCacheDifferentEnvironments() throws Exception {
+    Environment envFoo = 
Environment.newBuilder().setUrl("foo-env-url").build();
+    RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
+    InstructionRequestHandler fooInstructionHandler = 
mock(InstructionRequestHandler.class);
+    when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
+    
when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
+    // Don't bother creating a distinct instruction response because we don't 
use it here.
+    when(fooInstructionHandler.handle(any()))
+        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+
+    try (DockerJobBundleFactory bundleFactory =
+        new DockerJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      bundleFactory.forStage(getExecutableStage(environment));
+      bundleFactory.forStage(getExecutableStage(envFoo));
+      verify(envFactory).createEnvironment(environment);
+      verify(envFactory).createEnvironment(envFoo);
+      verifyNoMoreInteractions(envFactory);
+    }
+  }
+
+  private static ExecutableStage getExecutableStage(Environment environment) {
+    return ExecutableStage.fromPayload(
+        ExecutableStagePayload.newBuilder()
+            .setInput("input-pc")
+            .setEnvironment(environment)
+            .setComponents(
+                Components.newBuilder()
+                    .putPcollections(
+                        "input-pc",
+                        PCollection.newBuilder()
+                            .setWindowingStrategyId("windowing-strategy")
+                            .setCoderId("coder-id")
+                            .build())
+                    .putWindowingStrategies(
+                        "windowing-strategy",
+                        
WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build())
+                    .putCoders(
+                        "coder-id",
+                        Coder.newBuilder()
+                            .setSpec(
+                                SdkFunctionSpec.newBuilder()
+                                    .setSpec(
+                                        FunctionSpec.newBuilder()
+                                            
.setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN)
+                                            .build())
+                                    .build())
+                            .build())
+                    .build())
+            .build());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104770)
    Time Spent: 1h 40m  (was: 1.5h)

> Implement a reusable library that can run an ExecutableStage with a given 
> Environment
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-4267
>                 URL: https://issues.apache.org/jira/browse/BEAM-4267
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Axel Magnuson
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Build off of the interfaces introduced in 
> [BEAM-3327|https://github.com/apache/beam/pull/5152] to provide a reusable 
> execution library to runners.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to