[ 
https://issues.apache.org/jira/browse/BEAM-2899?focusedWorklogId=108253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108253
 ]

ASF GitHub Bot logged work on BEAM-2899:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jun/18 01:40
            Start Date: 02/Jun/18 01:40
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5535: [BEAM-2899][BEAM-2885] 
Use Docker in the ReferenceRunner
URL: https://github.com/apache/beam/pull/5535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 011abdcf4b7..ff52d78105c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -32,7 +32,6 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
-import org.apache.beam.sdk.util.ReleaseInfo;
 
 /**
  * Utilities for interacting with portability {@link Environment environments}.
@@ -49,9 +48,7 @@
   private static final EnvironmentIdExtractor DEFAULT_SPEC_EXTRACTOR = 
(transform) -> null;
 
   private static final String JAVA_SDK_HARNESS_CONTAINER_URL =
-      String.format(
-          "%s-%s",
-          ReleaseInfo.getReleaseInfo().getName(), 
ReleaseInfo.getReleaseInfo().getVersion());
+      String.format("%s-docker-apache.bintray.io/beam/java", 
System.getenv("USER"));
   public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT =
       Environment.newBuilder().setUrl(JAVA_SDK_HARNESS_CONTAINER_URL).build();
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
index 143a0ba82d1..277f02ef8aa 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
@@ -296,6 +296,7 @@ public final void handleException(CommittedBundle<?> 
inputBundle, Exception e) {
 
     @Override
     public void handleError(Error err) {
+      outstandingWork.decrementAndGet();
       pipelineMessageReceiver.failed(err);
     }
   }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 18b2a7d85b4..1763086d4e7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -24,10 +24,13 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Struct;
-import java.io.IOException;
+import java.io.File;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo;
+import org.apache.beam.model.fnexecution.v1.ProvisionApi.Resources;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -36,11 +39,11 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.runners.core.construction.ModelCoders;
 import org.apache.beam.runners.core.construction.ModelCoders.KvCoderComponents;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.SyntheticComponents;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
@@ -48,35 +51,54 @@
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import 
org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
 import org.apache.beam.runners.direct.ExecutableGraph;
+import 
org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalService;
+import 
org.apache.beam.runners.direct.portable.artifact.UnsupportedArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
 import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
 import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** The "ReferenceRunner" engine implementation. */
-class ReferenceRunner {
+public class ReferenceRunner {
   private final RunnerApi.Pipeline pipeline;
   private final Struct options;
+  @Nullable private final File artifactsDir;
 
-  private ReferenceRunner(RunnerApi.Pipeline p, Struct options) throws 
IOException {
+  private final EnvironmentType environmentType;
+
+  private ReferenceRunner(
+      Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType 
environmentType) {
     this.pipeline = executable(p);
     this.options = options;
+    this.artifactsDir = artifactsDir;
+    this.environmentType = environmentType;
+  }
+
+  public static ReferenceRunner forPipeline(
+      RunnerApi.Pipeline p, Struct options, File artifactsDir) {
+    return new ReferenceRunner(p, options, artifactsDir, 
EnvironmentType.DOCKER);
   }
 
-  static ReferenceRunner forPipeline(RunnerApi.Pipeline p, Struct options) 
throws IOException {
-    return new ReferenceRunner(p, options);
+  static ReferenceRunner forInProcessPipeline(RunnerApi.Pipeline p, Struct 
options) {
+    return new ReferenceRunner(p, options, null, EnvironmentType.IN_PROCESS);
   }
 
   private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
@@ -98,9 +120,27 @@ public void execute() throws Exception {
     ServerFactory serverFactory = createServerFactory();
     ControlClientPool controlClientPool = MapControlClientPool.create();
     ExecutorService dataExecutor = Executors.newCachedThreadPool();
+    ProvisionInfo provisionInfo =
+        ProvisionInfo.newBuilder()
+            .setJobId("id")
+            .setJobName("reference")
+            .setPipelineOptions(options)
+            .setWorkerId("foo")
+            .setResourceLimits(Resources.getDefaultInstance())
+            .build();
     try (GrpcFnServer<GrpcLoggingService> logging =
             GrpcFnServer.allocatePortAndCreateFor(
                 GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+        GrpcFnServer<ArtifactRetrievalService> artifact =
+            artifactsDir == null
+                ? GrpcFnServer.allocatePortAndCreateFor(
+                    UnsupportedArtifactRetrievalService.create(), 
serverFactory)
+                : GrpcFnServer.allocatePortAndCreateFor(
+                    
LocalFileSystemArtifactRetrievalService.forRootDirectory(artifactsDir),
+                    serverFactory);
+        GrpcFnServer<StaticGrpcProvisionService> provisioning =
+            GrpcFnServer.allocatePortAndCreateFor(
+                StaticGrpcProvisionService.create(provisionInfo), 
serverFactory);
         GrpcFnServer<FnApiControlClientPoolService> control =
             GrpcFnServer.allocatePortAndCreateFor(
                 FnApiControlClientPoolService.offeringClientsToPool(
@@ -114,13 +154,10 @@ public void execute() throws Exception {
             GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory)) {
 
       EnvironmentFactory environmentFactory =
-          InProcessEnvironmentFactory.create(
-              PipelineOptionsTranslation.fromProto(options),
-              logging,
-              control,
-              controlClientPool.getSource());
+          createEnvironmentFactory(
+              control, logging, artifact, provisioning, 
controlClientPool.getSource());
       JobBundleFactory jobBundleFactory =
-          DirectJobBundleFactory.create(environmentFactory, data, state);
+          SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, 
data, state);
 
       TransformEvaluatorRegistry transformRegistry =
           TransformEvaluatorRegistry.portableRegistry(
@@ -140,7 +177,39 @@ public void execute() throws Exception {
   }
 
   private ServerFactory createServerFactory() {
-    return InProcessServerFactory.create();
+    switch (environmentType) {
+      case DOCKER:
+        return ServerFactory.createDefault();
+      case IN_PROCESS:
+        return InProcessServerFactory.create();
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown %s %s", 
EnvironmentType.class.getSimpleName(), environmentType));
+    }
+  }
+
+  private EnvironmentFactory createEnvironmentFactory(
+      GrpcFnServer<FnApiControlClientPoolService> control,
+      GrpcFnServer<GrpcLoggingService> logging,
+      GrpcFnServer<ArtifactRetrievalService> artifact,
+      GrpcFnServer<StaticGrpcProvisionService> provisioning,
+      ControlClientPool.Source controlClientSource) {
+    switch (environmentType) {
+      case DOCKER:
+        return DockerEnvironmentFactory.forServices(
+            control,
+            logging,
+            artifact,
+            provisioning,
+            controlClientSource,
+            IdGenerators.incrementingLongs());
+      case IN_PROCESS:
+        return InProcessEnvironmentFactory.create(
+            PipelineOptionsFactory.create(), logging, control, 
controlClientSource);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown %s %s", 
EnvironmentType.class.getSimpleName(), environmentType));
+    }
   }
 
   @VisibleForTesting
@@ -218,4 +287,9 @@ public MessageWithComponents getReplacement(String gbkId, 
ComponentsOrBuilder co
           .build();
     }
   }
+
+  private enum EnvironmentType {
+    DOCKER,
+    IN_PROCESS
+  }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
index f34c4d54d49..b648205fe63 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -67,7 +67,7 @@ private RemoteStageEvaluator(PTransformNode transform) throws 
Exception {
           ExecutableStage.fromPayload(
               
ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload()));
       outputs = new ArrayList<>();
-      StageBundleFactory stageFactory = jobFactory.forStage(stage);
+      StageBundleFactory<T> stageFactory = jobFactory.forStage(stage);
       bundle =
           stageFactory.getBundle(
               BundleFactoryOutputRecieverFactory.create(
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
index 0a919e406f0..a27bc07ef08 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
@@ -90,7 +90,6 @@ private TransformEvaluatorRegistry(
 
     TransformEvaluatorFactory factory =
         checkNotNull(factories.get(urn), "No evaluator for PTransform \"%s\"", 
urn);
-    LOG.warn("Evaluator Factory {} for PTransform {}", factory, application);
     return factory.forApplication(application, inputBundle);
   }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalService.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalService.java
new file mode 100644
index 00000000000..0740202491b
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable.artifact;
+
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+
+/**
+ * An {@link ArtifactRetrievalService} which has not implemented any methods.
+ *
+ * <p>For use with an in-process SDK harness.
+ */
+public class UnsupportedArtifactRetrievalService
+    extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
+    implements ArtifactRetrievalService {
+
+  public static ArtifactRetrievalService create() {
+    return new UnsupportedArtifactRetrievalService();
+  }
+
+  private UnsupportedArtifactRetrievalService() {}
+
+  @Override
+  public void close() {
+    // Do nothing.
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index 274088e8ca2..1a012653ec2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
@@ -29,6 +30,8 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.model.jobmanagement.v1.JobApi;
 import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
@@ -39,6 +42,7 @@
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
 import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceImplBase;
+import org.apache.beam.runners.direct.portable.ReferenceRunner;
 import 
org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -51,19 +55,29 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(ReferenceRunnerJobService.class);
 
   public static ReferenceRunnerJobService create(final ServerFactory 
serverFactory) {
-    return new ReferenceRunnerJobService(serverFactory, filesTempDirectory());
+    return new ReferenceRunnerJobService(
+        serverFactory, () -> 
Files.createTempDirectory("reference-runner-staging"));
   }
 
   private final ServerFactory serverFactory;
-  private final Callable<Path> stagingPathSupplier;
+  private final Callable<Path> stagingPathCallable;
 
   private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
+  private final ConcurrentMap<String, ReferenceRunner> runningJobs;
+  private final ExecutorService executor;
 
   private ReferenceRunnerJobService(
-      ServerFactory serverFactory, Callable<Path> stagingPathSupplier) {
+      ServerFactory serverFactory, Callable<Path> stagingPathCallable) {
     this.serverFactory = serverFactory;
-    this.stagingPathSupplier = stagingPathSupplier;
+    this.stagingPathCallable = stagingPathCallable;
     unpreparedJobs = new ConcurrentHashMap<>();
+    runningJobs = new ConcurrentHashMap<>();
+    executor =
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setNameFormat("reference-runner-pipeline-%s")
+                .build());
   }
 
   public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> 
supplier) {
@@ -78,9 +92,9 @@ public void prepare(
       LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request);
 
       String preparationId = request.getJobName() + 
ThreadLocalRandom.current().nextInt();
-      Path tempDir = Files.createTempDirectory("reference-runner-staging");
+      Path tempDir = stagingPathCallable.call();
       GrpcFnServer<LocalFileSystemArtifactStagerService> 
artifactStagingService =
-          createArtifactStagingService();
+          createArtifactStagingService(tempDir);
       PreparingJob previous =
           unpreparedJobs.putIfAbsent(
               preparationId,
@@ -105,10 +119,10 @@ public void prepare(
     }
   }
 
-  private GrpcFnServer<LocalFileSystemArtifactStagerService> 
createArtifactStagingService()
-      throws Exception {
+  private GrpcFnServer<LocalFileSystemArtifactStagerService> 
createArtifactStagingService(
+      Path stagingPath) throws Exception {
     LocalFileSystemArtifactStagerService service =
-        
LocalFileSystemArtifactStagerService.forRootDirectory(stagingPathSupplier.call().toFile());
+        
LocalFileSystemArtifactStagerService.forRootDirectory(stagingPath.toFile());
     return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
   }
 
@@ -132,9 +146,21 @@ public void run(
       } catch (Exception e) {
         responseObserver.onError(e);
       }
+
+      ReferenceRunner runner =
+          ReferenceRunner.forPipeline(
+              preparingJob.getPipeline(),
+              preparingJob.getOptions(),
+              preparingJob.getStagingLocation().toFile());
       String jobId = preparingJob + 
Integer.toString(ThreadLocalRandom.current().nextInt());
       
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
       responseObserver.onCompleted();
+      runningJobs.put(jobId, runner);
+      executor.submit(
+          () -> {
+            runner.execute();
+            return null;
+          });
     } catch (StatusRuntimeException e) {
       responseObserver.onError(e);
     } catch (Exception e) {
@@ -171,8 +197,4 @@ public void close() throws Exception {
       }
     }
   }
-
-  private static Callable<Path> filesTempDirectory() {
-    return () -> Files.createTempDirectory("reference-runner-staging");
-  }
 }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 2a249ddc269..48a14dea8eb 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -27,6 +27,7 @@
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -61,7 +62,8 @@ public void pipelineExecution() throws Exception {
                           public void process(ProcessContext ctxt) {
                             for (int i = 0; i < ctxt.element(); i++) {
                               ctxt.outputWithTimestamp(
-                                  KV.of("foo", i), new 
Instant(0).plus(Duration.standardHours(i)));
+                                  KV.of("foo", ctxt.element()),
+                                  new 
Instant(0).plus(Duration.standardHours(i)));
                             }
                             ctxt.output(originals, ctxt.element());
                           }
@@ -85,10 +87,16 @@ public void process(ProcessContext ctxt) {
                       }
                     }));
 
+    PAssert.that(grouped)
+        .containsInAnyOrder(
+            KV.of("foo", ImmutableSet.of(1, 2, 3)),
+            KV.of("foo", ImmutableSet.of(2, 3)),
+            KV.of("foo", ImmutableSet.of(3)));
+
     
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     ReferenceRunner runner =
-        ReferenceRunner.forPipeline(
+        ReferenceRunner.forInProcessPipeline(
             PipelineTranslation.toProto(p),
             
PipelineOptionsTranslation.toProto(PipelineOptionsFactory.create()));
     runner.execute();
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index 1d72ed9f0da..2a96a849ca6 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -43,6 +43,7 @@
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import 
org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
 import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
@@ -107,7 +108,8 @@ public void setup() throws Exception {
 
     bundleFactory = ImmutableListBundleFactory.create();
     JobBundleFactory jobBundleFactory =
-        DirectJobBundleFactory.create(environmentFactory, dataServer, 
stateServer);
+        SingleEnvironmentInstanceJobBundleFactory.create(
+            environmentFactory, dataServer, stateServer);
     factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
   }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
new file mode 100644
index 00000000000..ede7fe7849b
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/UnsupportedArtifactRetrievalServiceTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable.artifact;
+
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UnsupportedArtifactRetrievalService}. */
+@RunWith(JUnit4.class)
+public class UnsupportedArtifactRetrievalServiceTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private GrpcFnServer<ArtifactRetrievalService> server;
+  private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub stub;
+
+  @Before
+  public void setUp() throws Exception {
+    server =
+        GrpcFnServer.allocatePortAndCreateFor(
+            UnsupportedArtifactRetrievalService.create(), 
InProcessServerFactory.create());
+    stub =
+        ArtifactRetrievalServiceGrpc.newStub(
+            InProcessManagedChannelFactory.create()
+                .forDescriptor(server.getApiServiceDescriptor()));
+  }
+
+  @Test
+  public void getArtifactThrows() throws Exception {
+    SynchronousQueue<Optional<Throwable>> thrown = new SynchronousQueue<>();
+    stub.getArtifact(
+        GetArtifactRequest.newBuilder().setName("foo").build(),
+        new StreamObserver<ArtifactChunk>() {
+          @Override
+          public void onNext(ArtifactChunk value) {
+            try {
+              thrown.put(Optional.empty());
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            try {
+              thrown.put(Optional.of(t));
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            try {
+              thrown.put(Optional.empty());
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+        });
+    Throwable wasThrown =
+        thrown
+            .take()
+            .orElseThrow(
+                () ->
+                    new AssertionError(
+                        String.format(
+                            "The %s should respond to all calls with an error",
+                            
UnsupportedArtifactRetrievalServiceTest.class.getSimpleName())));
+  }
+
+  @Test
+  public void getManifestThrows() throws Exception {
+    SynchronousQueue<Optional<Throwable>> thrown = new SynchronousQueue<>();
+    stub.getManifest(
+        GetManifestRequest.newBuilder().build(),
+        new StreamObserver<GetManifestResponse>() {
+          @Override
+          public void onNext(GetManifestResponse value) {
+            try {
+              thrown.put(Optional.empty());
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            try {
+              thrown.put(Optional.of(t));
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            try {
+              thrown.put(Optional.empty());
+            } catch (InterruptedException e) {
+              fail();
+            }
+          }
+        });
+    Throwable wasThrown =
+        thrown
+            .take()
+            .orElseThrow(
+                () ->
+                    new AssertionError(
+                        String.format(
+                            "The %s should respond to all calls with an error",
+                            
UnsupportedArtifactRetrievalServiceTest.class.getSimpleName())));
+  }
+
+  @Test
+  public void closeCompletes() throws Exception {
+    server.getService().close();
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
similarity index 86%
rename from 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 2eaf0ec990a..5dcc87a11dd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.direct.portable;
+package org.apache.beam.runners.fnexecution.control;
 
 import com.google.common.collect.Iterables;
 import java.io.IOException;
@@ -28,14 +28,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
-import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
 import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
-import org.apache.beam.runners.fnexecution.control.RemoteBundle;
-import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
-import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
-import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
@@ -48,13 +41,21 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
-/** A {@link JobBundleFactory} for the ReferenceRunner. */
-class DirectJobBundleFactory implements JobBundleFactory {
+/**
+ * A {@link JobBundleFactory} which can manage a single instance of an {@link 
Environment}.
+ *
+ * @deprecated replace with a {@link DockerJobBundleFactory} when appropriate 
if the {@link
+ *     EnvironmentFactory} is a {@link
+ *     
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory}, or 
create an
+ *     {@code InProcessJobBundleFactory} and inline the creation of the 
environment if appropriate.
+ */
+@Deprecated
+public class SingleEnvironmentInstanceJobBundleFactory implements 
JobBundleFactory {
   public static JobBundleFactory create(
       EnvironmentFactory environmentFactory,
       GrpcFnServer<GrpcDataService> data,
       GrpcFnServer<GrpcStateService> state) {
-    return new DirectJobBundleFactory(environmentFactory, data, state);
+    return new SingleEnvironmentInstanceJobBundleFactory(environmentFactory, 
data, state);
   }
 
   private final EnvironmentFactory environmentFactory;
@@ -69,7 +70,7 @@ public static JobBundleFactory create(
 
   private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
 
-  private DirectJobBundleFactory(
+  private SingleEnvironmentInstanceJobBundleFactory(
       EnvironmentFactory environmentFactory,
       GrpcFnServer<GrpcDataService> dataService,
       GrpcFnServer<GrpcStateService> stateService) {
@@ -114,7 +115,7 @@ private DirectJobBundleFactory(
             descriptor.getProcessBundleDescriptor(),
             (RemoteInputDestination<WindowedValue<T>>) 
(RemoteInputDestination) destination,
             stateService.getService());
-    return new DirectStageBundleFactory<>(descriptor, bundleProcessor);
+    return new BundleProcessorStageBundleFactory<>(descriptor, 
bundleProcessor);
   }
 
   @Override
@@ -136,11 +137,11 @@ public void close() throws Exception {
     }
   }
 
-  private static class DirectStageBundleFactory<T> implements 
StageBundleFactory<T> {
+  private static class BundleProcessorStageBundleFactory<T> implements 
StageBundleFactory<T> {
     private final ExecutableProcessBundleDescriptor descriptor;
     private final SdkHarnessClient.BundleProcessor<T> processor;
 
-    private DirectStageBundleFactory(
+    private BundleProcessorStageBundleFactory(
         ExecutableProcessBundleDescriptor descriptor,
         SdkHarnessClient.BundleProcessor<T> processor) {
       this.descriptor = descriptor;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index d8529ee55c7..2c95ebc585a 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -41,7 +41,6 @@
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.FusedPipeline;
@@ -67,6 +66,7 @@
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
 import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -117,9 +117,7 @@ public void setup() throws Exception {
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
     stateDelegator = GrpcStateService.create();
-    stateServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            stateDelegator, serverFactory);
+    stateServer = GrpcFnServer.allocatePortAndCreateFor(stateDelegator, 
serverFactory);
 
     ControlClientPool clientPool = MapControlClientPool.create();
     controlServer =
@@ -186,7 +184,6 @@ public void process(ProcessContext ctxt) {
         .apply("gbk", GroupByKey.create());
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
-    Components components = pipelineProto.getComponents();
     FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
     checkState(fused.getFusedStages().size() == 1, "Expected exactly one fused 
stage");
     ExecutableStage stage = fused.getFusedStages().iterator().next();
@@ -201,21 +198,24 @@ public void process(ProcessContext ctxt) {
 
     BundleProcessor<byte[]> processor =
         controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), 
remoteDestination);
-    Map<Target, Coder<WindowedValue<?>>> outputTargets = 
descriptor.getOutputTargetCoders();
-    Map<Target, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
+    Map<Target, ? super Coder<WindowedValue<?>>> outputTargets = 
descriptor.getOutputTargetCoders();
+    Map<Target, Collection<? super WindowedValue<?>>> outputValues = new 
HashMap<>();
     Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
-    for (Entry<Target, Coder<WindowedValue<?>>> targetCoder : 
outputTargets.entrySet()) {
-      List<WindowedValue<?>> outputContents = Collections.synchronizedList(new 
ArrayList<>());
+    for (Entry<Target, ? super Coder<WindowedValue<?>>> targetCoder : 
outputTargets.entrySet()) {
+      List<? super WindowedValue<?>> outputContents =
+          Collections.synchronizedList(new ArrayList<>());
       outputValues.put(targetCoder.getKey(), outputContents);
       outputReceivers.put(
           targetCoder.getKey(),
-          RemoteOutputReceiver.of(targetCoder.getValue(), 
outputContents::add));
+          RemoteOutputReceiver.of(
+              (Coder) targetCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>) outputContents::add));
     }
     // The impulse example
     try (ActiveBundle<byte[]> bundle = processor.newBundle(outputReceivers)) {
       bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(new 
byte[0]));
     }
-    for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
+    for (Collection<? super WindowedValue<?>> windowedValues : 
outputValues.values()) {
       assertThat(
           windowedValues,
           containsInAnyOrder(
@@ -228,40 +228,44 @@ public void process(ProcessContext ctxt) {
   @Test
   public void testExecutionWithSideInput() throws Exception {
     Pipeline p = Pipeline.create();
-    PCollection<String> input = p.apply("impulse", Impulse.create())
-        .apply(
-            "create",
-            ParDo.of(
-                new DoFn<byte[], String>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctxt) {
-                    ctxt.output("zero");
-                    ctxt.output("one");
-                    ctxt.output("two");
-                  }
-                })).setCoder(StringUtf8Coder.of());
-    PCollectionView<Iterable<String>> view =
-        input.apply("createSideInput", View.asIterable());
+    PCollection<String> input =
+        p.apply("impulse", Impulse.create())
+            .apply(
+                "create",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output("zero");
+                        ctxt.output("one");
+                        ctxt.output("two");
+                      }
+                    }))
+            .setCoder(StringUtf8Coder.of());
+    PCollectionView<Iterable<String>> view = input.apply("createSideInput", 
View.asIterable());
 
     input
-        .apply("readSideInput", ParDo.of(new DoFn<String, KV<String, 
String>>() {
-          @ProcessElement
-          public void processElement(ProcessContext context) {
-            for (String value : context.sideInput(view)) {
-              context.output(KV.of(context.element(), value));
-            }
-          }
-        }).withSideInputs(view))
+        .apply(
+            "readSideInput",
+            ParDo.of(
+                    new DoFn<String, KV<String, String>>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext context) {
+                        for (String value : context.sideInput(view)) {
+                          context.output(KV.of(context.element(), value));
+                        }
+                      }
+                    })
+                .withSideInputs(view))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
         // Force the output to be materialized
         .apply("gbk", GroupByKey.create());
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
-    Components components = pipelineProto.getComponents();
     FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
-    Optional<ExecutableStage> optionalStage = Iterables.tryFind(
-        fused.getFusedStages(),
-        (ExecutableStage stage) -> !stage.getSideInputs().isEmpty());
+    Optional<ExecutableStage> optionalStage =
+        Iterables.tryFind(
+            fused.getFusedStages(), (ExecutableStage stage) -> 
!stage.getSideInputs().isEmpty());
     checkState(optionalStage.isPresent(), "Expected a stage with side 
inputs.");
     ExecutableStage stage = optionalStage.get();
 
@@ -276,8 +280,9 @@ public void processElement(ProcessContext context) {
         (RemoteInputDestination<WindowedValue<byte[]>>)
             (RemoteInputDestination) descriptor.getRemoteInputDestination();
 
-    BundleProcessor<byte[]> processor = controlClient.getProcessor(
-        descriptor.getProcessBundleDescriptor(), remoteDestination, 
stateDelegator);
+    BundleProcessor<byte[]> processor =
+        controlClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(), remoteDestination, 
stateDelegator);
     Map<Target, Coder<WindowedValue<?>>> outputTargets = 
descriptor.getOutputTargetCoders();
     Map<Target, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
     Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
@@ -289,17 +294,21 @@ public void processElement(ProcessContext context) {
           RemoteOutputReceiver.of(targetCoder.getValue(), 
outputContents::add));
     }
 
-    Iterable<byte[]> sideInputData = Arrays.asList(
-        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
-        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
-        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
+    Iterable<byte[]> sideInputData =
+        Arrays.asList(
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
     StateRequestHandler stateRequestHandler =
         StateRequestHandlers.forMultimapSideInputHandlerFactory(
             descriptor,
             new MultimapSideInputHandlerFactory() {
               @Override
               public <K, V, W extends BoundedWindow> 
MultimapSideInputHandler<K, V, W> forSideInput(
-                  String pTransformId, String sideInputId, Coder<K> keyCoder, 
Coder<V> valueCoder,
+                  String pTransformId,
+                  String sideInputId,
+                  Coder<K> keyCoder,
+                  Coder<V> valueCoder,
                   Coder<W> windowCoder) {
                 return new MultimapSideInputHandler<K, V, W>() {
                   @Override
@@ -311,10 +320,16 @@ public void processElement(ProcessContext context) {
             });
 
     try (ActiveBundle<byte[]> bundle = processor.newBundle(outputReceivers, 
stateRequestHandler)) {
-      bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(
-          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
-      bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(
-          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
+      bundle
+          .getInputReceiver()
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+      bundle
+          .getInputReceiver()
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
     }
     for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
       assertThat(
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
similarity index 95%
rename from 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
rename to 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 1c14145914a..760154c9d40 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.direct.portable;
+package org.apache.beam.runners.fnexecution.control;
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertThat;
@@ -42,8 +42,6 @@
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
-import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
@@ -58,9 +56,9 @@
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-/** Tests for {@link DirectJobBundleFactory}. */
+/** Tests for {@link SingleEnvironmentInstanceJobBundleFactory}. */
 @RunWith(JUnit4.class)
-public class DirectJobBundleFactoryTest {
+public class SingleEnvironmentInstanceJobBundleFactoryTest {
   @Mock private EnvironmentFactory environmentFactory;
   @Mock private InstructionRequestHandler instructionRequestHandler;
 
@@ -81,7 +79,9 @@ public void setup() throws Exception {
         
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
     stateServer = 
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
 
-    factory = DirectJobBundleFactory.create(environmentFactory, dataServer, 
stateServer);
+    factory =
+        SingleEnvironmentInstanceJobBundleFactory.create(
+            environmentFactory, dataServer, stateServer);
   }
 
   @After
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 319b8eac28c..4b15554e5a4 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -163,6 +163,13 @@
       can never refuse an offered update -->
   </Match>
 
+  <Match>
+    <Class 
name="org.apache.beam.runners.direct.portable.job.ReferenceRunnerJobService" />
+    <Method name="run" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+    <!-- the success of future methods aren't based on the returned value of 
the future -->
+  </Match>
+
   <Match>
     <Class 
name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
     <Or>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 108253)
    Time Spent: 1h 10m  (was: 1h)

> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> To make the portability effort tractable, we should implement a Universal 
> Local Runner (ULR) in Java that runs in a single server process plus docker 
> containers for the SDK harness containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature 
> should be implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the 
> portability framework. It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime 
> environment. For example, a DoFn that shells out has a dependency that may be 
> satisfied on the user's desktop (and thus works fine on the direct runner), 
> but perhaps not by the container harness image. The ULR allows for an easy 
> way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to