[ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=91528&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91528
 ]

ASF GitHub Bot logged work on BEAM-3327:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Apr/18 22:50
            Start Date: 16/Apr/18 22:50
    Worklog Time Spent: 10m 
      Work Description: bsidhom closed pull request #4751: [BEAM-3327] 
Implement simple Docker container manager
URL: https://github.com/apache/beam/pull/4751
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/java-fn-execution/build.gradle 
b/runners/java-fn-execution/build.gradle
index b1aa9e8c0ca..c50ef34a1a2 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -48,3 +48,18 @@ dependencies {
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_simple
 }
+
+test {
+  useJUnit {
+    // Exclude tests that need Docker.
+    excludeCategories 
"org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
+  }
+}
+
+task testDocker(type: Test) {
+  group = "Verification"
+  description = "Runs Docker tests"
+  useJUnit {
+    includeCategories 
"org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
+  }
+}
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index 5096b299719..7f09a48fb9a 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,6 +32,53 @@
 
   <packaging>jar</packaging>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <!-- Do not run Docker tests with unit tests. -->
+              <excludedGroups>
+                
org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker
+              </excludedGroups>
+            </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>docker-tests</id>
+      <activation><activeByDefault>false</activeByDefault></activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>docker-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <groups>
+                    
org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker
+                  </groups>
+                  <!-- TODO: Enable this when we figure out how to run the 
tests
+                  <failIfNoTests>true</failIfNoTests>
+                   -->
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
new file mode 100644
index 00000000000..1f95d8c8922
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+
+/**
+ * A {@link RemoteEnvironment} that talks to a Docker container. Accessors are 
thread-compatible.
+ */
+class DockerContainerEnvironment implements RemoteEnvironment {
+
+  static DockerContainerEnvironment create(DockerWrapper docker,
+      Environment environment, String containerId, SdkHarnessClient client) {
+    return new DockerContainerEnvironment(docker, environment, containerId, 
client);
+  }
+
+  private final DockerWrapper docker;
+  private final Environment environment;
+  private final String containerId;
+  private final SdkHarnessClient client;
+
+  private DockerContainerEnvironment(DockerWrapper docker, Environment 
environment,
+      String containerId, SdkHarnessClient client) {
+    this.docker = docker;
+    this.environment = environment;
+    this.containerId = containerId;
+    this.client = client;
+  }
+
+  @Override
+  public Environment getEnvironment() {
+    return environment;
+  }
+
+  @Override
+  public SdkHarnessClient getClient() {
+    return client;
+  }
+
+  /**
+   * Closes this remote docker environment. The associated {@link 
SdkHarnessClient} must not be
+   * used after calling this.
+   */
+  @Override
+  public void close() throws Exception {
+    getClient().close();
+    docker.killContainer(containerId);
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerWrapper.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerWrapper.java
new file mode 100644
index 00000000000..ee25c8aeb2c
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * A docker command wrapper. Simplifies communications with the Docker daemon.
+ */
+class DockerWrapper {
+  // TODO: Should we require 64-character container ids? Docker technically 
allows abbreviated ids,
+  // but we _should_ always capture full ids.
+  private static final Pattern CONTAINER_ID_PATTERN = 
Pattern.compile("\\p{XDigit}{64}");
+
+  static DockerWrapper forCommand(String dockerExecutable, Duration 
commandTimeout) {
+    return new DockerWrapper(dockerExecutable, commandTimeout);
+  }
+
+  private final String dockerExecutable;
+  private final Duration commandTimeout;
+
+  private DockerWrapper(String dockerExecutable, Duration commandTimeout) {
+    this.dockerExecutable = dockerExecutable;
+    this.commandTimeout = commandTimeout;
+  }
+
+  /**
+   * Runs the given container image with the given command line arguments. 
Returns the running
+   * container id.
+   */
+  public String runImage(String imageTag, List<String> args) throws 
IOException, TimeoutException,
+      InterruptedException {
+    checkArgument(!imageTag.isEmpty(), "Docker image tag required");
+    // TODO: Validate args?
+    return runShortCommand(ImmutableList.<String>builder()
+        .add(dockerExecutable)
+        .add("run")
+        .add("-d")
+        .add(imageTag)
+        .addAll(args)
+        .build());
+  }
+
+  /**
+   * Kills a docker container by container id.
+   * @throws IOException if an IOException occurs or if the given container id 
does not exist
+   */
+  public void killContainer(String containerId) throws IOException, 
TimeoutException,
+      InterruptedException {
+    checkArgument(containerId != null);
+    checkArgument(CONTAINER_ID_PATTERN.matcher(containerId).matches(),
+        "Container ID must be a 64-character hexadecimal string");
+    runShortCommand(Arrays.asList(dockerExecutable, "kill", containerId));
+  }
+
+  /** Run the given command invocation and return stdout as a String. */
+  private String runShortCommand(List<String> invocation) throws IOException,
+      TimeoutException, InterruptedException {
+    ProcessBuilder pb = new ProcessBuilder(invocation);
+    Process process = pb.start();
+    // TODO: Consider supplying executor service here.
+    CompletableFuture<String> resultString = CompletableFuture.supplyAsync(() 
-> {
+      // NOTE: We do not own the underlying stream and do not close it.
+      BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream(),
+          StandardCharsets.UTF_8));
+      return reader.lines().collect(Collectors.joining());
+    });
+    // TODO: Retry on interrupt?
+    boolean processDone = process.waitFor(commandTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+    if (!processDone) {
+      process.destroy();
+      throw new TimeoutException(
+          String.format("Timed out while waiting for command '%s'",
+              invocation.stream().collect(Collectors.joining(" "))));
+    }
+    int exitCode = process.exitValue();
+    if (exitCode != 0) {
+      throw new IOException(String.format("Received exit code %d for command 
'%s'",
+          exitCode, invocation.stream().collect(Collectors.joining(" "))));
+    }
+    try {
+      // TODO: Consider a stricter timeout.
+      return resultString.get(commandTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      // Recast any exceptions in reading output as IOExceptions.
+      throw new IOException(cause);
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java
new file mode 100644
index 00000000000..c2df17935db
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClientControlService;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+
+/** An {@link EnvironmentManager} that manages a single docker container. Not 
thread-safe. */
+public class SingletonDockerEnvironmentManager implements EnvironmentManager {
+
+  public static SingletonDockerEnvironmentManager forServices(
+      DockerWrapper docker,
+      GrpcFnServer<SdkHarnessClientControlService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer) {
+    return new SingletonDockerEnvironmentManager(docker, controlServiceServer, 
loggingServiceServer,
+        retrievalServiceServer, provisioningServiceServer);
+  }
+
+  private final DockerWrapper docker;
+  private final GrpcFnServer<SdkHarnessClientControlService> 
controlServiceServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> 
provisioningServiceServer;
+
+  private RemoteEnvironment dockerEnvironment = null;
+
+  private SingletonDockerEnvironmentManager(
+      DockerWrapper docker,
+      GrpcFnServer<SdkHarnessClientControlService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer) {
+    this.docker = docker;
+    this.controlServiceServer = controlServiceServer;
+    this.loggingServiceServer = loggingServiceServer;
+    this.retrievalServiceServer = retrievalServiceServer;
+    this.provisioningServiceServer = provisioningServiceServer;
+  }
+
+  /**
+   * Retrieve a handle for the given environment. The same environment must be 
requested every time.
+   * The same remote handle is returned to every caller, so the environment 
cannot be used once
+   * closed.
+   */
+  @Override
+  public RemoteEnvironment getEnvironment(Environment environment) throws 
Exception {
+    if (dockerEnvironment == null) {
+      dockerEnvironment = createDockerEnv(environment);
+    } else {
+      checkArgument(
+          
environment.getUrl().equals(dockerEnvironment.getEnvironment().getUrl()),
+          "A %s must only be queried for a single %s. Existing %s, Argument 
%s",
+          SingletonDockerEnvironmentManager.class.getSimpleName(),
+          Environment.class.getSimpleName(),
+          dockerEnvironment.getEnvironment().getUrl(),
+          environment.getUrl());
+    }
+    return dockerEnvironment;
+  }
+
+  private DockerContainerEnvironment createDockerEnv(Environment environment)
+      throws IOException, TimeoutException, InterruptedException {
+    // TODO: Generate environment id correctly.
+    String environmentId = Long.toString(-123);
+    Path workerPersistentDirectory = 
Files.createTempDirectory("worker_persistent_directory");
+    Path semiPersistentDirectory = 
Files.createTempDirectory("semi_persistent_dir");
+    String containerImage = environment.getUrl();
+    // TODO: The default service address will not work for Docker for Mac.
+    String loggingEndpoint = 
loggingServiceServer.getApiServiceDescriptor().getUrl();
+    String artifactEndpoint = 
retrievalServiceServer.getApiServiceDescriptor().getUrl();
+    String provisionEndpoint = 
provisioningServiceServer.getApiServiceDescriptor().getUrl();
+    String controlEndpoint = 
controlServiceServer.getApiServiceDescriptor().getUrl();
+    List<String> args = Arrays.asList(
+        "-v",
+        // TODO: Mac only allows temporary mounts under /tmp by default (as of 
17.12).
+        String.format("%s:%s", workerPersistentDirectory, 
semiPersistentDirectory),
+        // NOTE: Host networking does not work on Mac, but the command line 
flag is accepted.
+        "--network=host",
+        containerImage,
+        String.format("--id=%s", environmentId),
+        String.format("--logging_endpoint=%s", loggingEndpoint),
+        String.format("--artifact_endpoint=%s", artifactEndpoint),
+        String.format("--provision_endpoint=%s", provisionEndpoint),
+        String.format("--control_endpoint=%s", controlEndpoint),
+        String.format("--semi_persist_dir=%s", semiPersistentDirectory));
+    String containerId = docker.runImage(containerImage, args);
+    System.out.println("GOT ID: " + containerId);
+    return DockerContainerEnvironment.create(docker, environment, containerId,
+        controlServiceServer.getService().getClient());
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
new file mode 100644
index 00000000000..e009ecf934c
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment.testing;
+
+/** Category for integration tests that require Docker. */
+public interface NeedsDocker {}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
new file mode 100644
index 00000000000..5777a29f39d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Test utilities for the environment management package. */
+package org.apache.beam.runners.fnexecution.environment.testing;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerWrapperTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerWrapperTest.java
new file mode 100644
index 00000000000..98e8777673c
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerWrapperTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DockerWrapper}. */
+@Category(NeedsDocker.class)
+@RunWith(JUnit4.class)
+public class DockerWrapperTest {
+
+  @Test
+  public void helloWorld() throws Exception {
+    DockerWrapper docker = getWrapper();
+    String container = docker.runImage("hello-world", Collections.emptyList());
+    System.out.printf("Started container: %s%n", container);
+  }
+
+  @Test
+  public void killContainer() throws Exception {
+    DockerWrapper docker = getWrapper();
+    String container = docker.runImage("debian", Arrays.asList("/bin/bash", 
"-c", "sleep 60"));
+    docker.killContainer(container);
+  }
+
+  private static DockerWrapper getWrapper() {
+    return DockerWrapper.forCommand(
+        "docker", Duration.ofMillis(10000));
+  }
+
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManagerTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManagerTest.java
new file mode 100644
index 00000000000..05bd378a051
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManagerTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClientControlService;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link SingletonDockerEnvironmentManager}. */
+@RunWith(JUnit4.class)
+public class SingletonDockerEnvironmentManagerTest {
+
+  private static final ApiServiceDescriptor SERVICE_DESCRIPTOR = 
ApiServiceDescriptor.newBuilder()
+      .setUrl("service-url")
+      .build();
+  private static final String IMAGE_NAME = "my-image";
+  private static final Environment ENVIRONMENT = Environment.newBuilder()
+      .setUrl(IMAGE_NAME)
+      .build();
+  private static final String CONTAINER_ID =
+      "e4485f0f2b813b63470feacba5fe9cb89699878c095df4124abd320fd5401385";
+
+  @Mock private DockerWrapper docker;
+
+  @Mock private GrpcFnServer<SdkHarnessClientControlService> 
controlServiceServer;
+  @Mock private GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  @Mock private GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  @Mock private GrpcFnServer<StaticGrpcProvisionService> 
provisioningServiceServer;
+
+  @Mock private SdkHarnessClientControlService clientControlService;
+  @Mock private SdkHarnessClient sdkHarnessClient;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+
+    
when(controlServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(loggingServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(retrievalServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(provisioningServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+
+    when(controlServiceServer.getService()).thenReturn(clientControlService);
+    when(clientControlService.getClient()).thenReturn(sdkHarnessClient);
+  }
+
+  @Test
+  public void createsCorrectEnvironment() throws Exception {
+    when(docker.runImage(Mockito.eq(IMAGE_NAME), 
Mockito.any())).thenReturn(CONTAINER_ID);
+    SingletonDockerEnvironmentManager manager = getManager();
+
+    RemoteEnvironment handle = manager.getEnvironment(ENVIRONMENT);
+    assertThat(handle.getClient(), is(sdkHarnessClient));
+    assertThat(handle.getEnvironment(), equalTo(ENVIRONMENT));
+  }
+
+  @Test
+  public void destroysCorrectContainer() throws Exception {
+    when(docker.runImage(Mockito.eq(IMAGE_NAME), 
Mockito.any())).thenReturn(CONTAINER_ID);
+    SingletonDockerEnvironmentManager manager = getManager();
+
+    RemoteEnvironment handle = manager.getEnvironment(ENVIRONMENT);
+    handle.close();
+    verify(docker).killContainer(CONTAINER_ID);
+  }
+
+  @Test
+  public void onlyAcceptsSingleEnvironment() throws Exception {
+    when(docker.runImage(Mockito.eq(IMAGE_NAME), 
Mockito.any())).thenReturn(CONTAINER_ID);
+    SingletonDockerEnvironmentManager manager = getManager();
+
+    manager.getEnvironment(ENVIRONMENT);
+    // TODO: Use JUnit assertThrows when available.
+    try {
+      
manager.getEnvironment(ENVIRONMENT.toBuilder().setUrl("other-environment").build());
+      fail("Expected exception");
+    } catch (Exception expected) {
+      assertThat(expected, instanceOf(IllegalArgumentException.class));
+    }
+  }
+
+  private SingletonDockerEnvironmentManager getManager() {
+    return SingletonDockerEnvironmentManager.forServices(
+        docker,
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer);
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91528)
    Time Spent: 5h 10m  (was: 5h)

> Add abstractions to manage Environment Instance lifecycles.
> -----------------------------------------------------------
>
>                 Key: BEAM-3327
>                 URL: https://issues.apache.org/jira/browse/BEAM-3327
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Axel Magnuson
>            Priority: Major
>              Labels: portability
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to