[ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=176945&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176945
 ]

ASF GitHub Bot logged work on BEAM-6094:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Dec/18 11:44
            Start Date: 19/Dec/18 11:44
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #7307: [BEAM-6094] Add 
loopback environment for java.
URL: https://github.com/apache/beam/pull/7307
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index bcca38476da7..9b660f96eb78 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -25,10 +25,12 @@
 import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExternalPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
@@ -66,7 +68,9 @@
           
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
   public static final String ENVIRONMENT_DOCKER = "DOCKER";
   public static final String ENVIRONMENT_PROCESS = "PROCESS";
+  public static final String ENVIRONMENT_EXTERNAL = "EXTERNAL";
   public static final String ENVIRONMENT_EMBEDDED = "EMBEDDED"; // Non Public 
urn for testing
+  public static final String ENVIRONMENT_LOOPBACK = "LOOPBACK"; // Non Public 
urn for testing
 
   /* For development, use the container build by the current user to ensure 
that the SDK harness and
    * the SDK agree on how they should interact. This should be changed to a 
version-specific
@@ -90,6 +94,9 @@ public static Environment 
createOrGetDefaultEnvironment(String type, String conf
     switch (type) {
       case ENVIRONMENT_EMBEDDED:
         return createEmbeddedEnvironment(config);
+      case ENVIRONMENT_EXTERNAL:
+      case ENVIRONMENT_LOOPBACK:
+        return createExternalEnvironment(config);
       case ENVIRONMENT_PROCESS:
         return createProcessEnvironment(config);
       case ENVIRONMENT_DOCKER:
@@ -107,6 +114,17 @@ public static Environment createDockerEnvironment(String 
dockerImageUrl) {
         .build();
   }
 
+  private static Environment createExternalEnvironment(String config) {
+    return Environment.newBuilder()
+        .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL))
+        .setPayload(
+            ExternalPayload.newBuilder()
+                
.setEndpoint(ApiServiceDescriptor.newBuilder().setUrl(config).build())
+                .build()
+                .toByteString())
+        .build();
+  }
+
   private static Environment createProcessEnvironment(String config) {
     try {
       ProcessPayloadReferenceJSON payloadReferenceJSON =
diff --git a/runners/reference/java/build.gradle 
b/runners/reference/java/build.gradle
index 78ef5b07eb4f..edc1a29ba872 100644
--- a/runners/reference/java/build.gradle
+++ b/runners/reference/java/build.gradle
@@ -33,7 +33,9 @@ dependencies {
   compile library.java.hamcrest_library
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
+  shadow project(path: ":beam-sdks-java-harness", configuration: "shadow")
   shadow library.java.vendored_grpc_1_13_1
   shadow library.java.slf4j_api
   shadowTest project(path: ":beam-runners-core-construction-java", 
configuration: "shadowTest")
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ExternalWorkerService.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ExternalWorkerService.java
new file mode 100644
index 000000000000..8603cdedf48f
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ExternalWorkerService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import org.apache.beam.fn.harness.FnHarness;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.NotifyRunnerAvailableRequest;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.NotifyRunnerAvailableResponse;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the BeamFnExternalWorkerPool service by starting a fresh SDK 
harness for each request.
+ */
+public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase 
implements FnService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnExternalWorkerPoolImplBase.class);
+
+  private final PipelineOptions options;
+  private final ServerFactory serverFactory = ServerFactory.createDefault();
+
+  public ExternalWorkerService(PipelineOptions options) throws Exception {
+    this.options = options;
+  }
+
+  @Override
+  public void notifyRunnerAvailable(
+      NotifyRunnerAvailableRequest request,
+      StreamObserver<NotifyRunnerAvailableResponse> responseObserver) {
+    LOG.info(
+        "Starting worker {} pointing at {}.",
+        request.getWorkerId(),
+        request.getControlEndpoint().getUrl());
+    LOG.debug("Worker request {}.", request);
+    Thread th =
+        new Thread(
+            () -> {
+              try {
+                FnHarness.main(
+                    request.getWorkerId(),
+                    options,
+                    request.getLoggingEndpoint(),
+                    request.getControlEndpoint());
+                LOG.info("Successfully started worker {}.", 
request.getWorkerId());
+              } catch (Exception exn) {
+                LOG.error(String.format("Failed to start worker %s.", 
request.getWorkerId()), exn);
+              }
+            });
+    th.setName("SDK-worker-" + request.getWorkerId());
+    th.setDaemon(true);
+    th.start();
+
+    
responseObserver.onNext(NotifyRunnerAvailableResponse.newBuilder().build());
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void close() {}
+
+  public GrpcFnServer<ExternalWorkerService> start() throws Exception {
+    GrpcFnServer<ExternalWorkerService> server =
+        GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    LOG.debug(
+        "Listening for worker start requests at {}.", 
server.getApiServiceDescriptor().getUrl());
+    return server;
+  }
+}
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
index 8c49642775bc..ce29a927caab 100644
--- 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
@@ -44,11 +44,14 @@
   private final ByteString jobId;
   private final CloseableResource<JobServiceBlockingStub> jobService;
   @Nullable private State terminationState;
+  @Nullable private Runnable cleanup;
 
-  JobServicePipelineResult(ByteString jobId, 
CloseableResource<JobServiceBlockingStub> jobService) {
+  JobServicePipelineResult(
+      ByteString jobId, CloseableResource<JobServiceBlockingStub> jobService, 
Runnable cleanup) {
     this.jobId = jobId;
     this.jobService = jobService;
     this.terminationState = null;
+    this.cleanup = cleanup;
   }
 
   @Override
@@ -124,6 +127,9 @@ public MetricResults metrics() {
   @Override
   public void close() {
     try (CloseableResource<JobServiceBlockingStub> jobService = 
this.jobService) {
+      if (cleanup != null) {
+        cleanup.run();
+      }
     } catch (Exception e) {
       LOG.warn("Error cleaning up job service", e);
     }
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
index 79f823bafee8..a3d1a44d1ced 100644
--- 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
@@ -38,9 +38,11 @@
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.core.construction.ArtifactServiceStager;
 import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.reference.CloseableResource.CloseException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -137,6 +139,32 @@ private PortableRunner(
   public PipelineResult run(Pipeline pipeline) {
     
pipeline.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride()));
 
+    Runnable cleanup;
+    if (Environments.ENVIRONMENT_LOOPBACK.equals(
+        
options.as(PortablePipelineOptions.class).getDefaultEnvironmentType())) {
+      GrpcFnServer<ExternalWorkerService> workerService;
+      try {
+        workerService = new ExternalWorkerService(options).start();
+      } catch (Exception exn) {
+        throw new RuntimeException("Failed to start GrpcFnServer for 
ExternalWorkerService", exn);
+      }
+      LOG.info("Starting worker service at {}", 
workerService.getApiServiceDescriptor().getUrl());
+      options
+          .as(PortablePipelineOptions.class)
+          
.setDefaultEnvironmentConfig(workerService.getApiServiceDescriptor().getUrl());
+      cleanup =
+          () -> {
+            try {
+              LOG.warn("closing worker service {}", workerService);
+              workerService.close();
+            } catch (Exception exn) {
+              throw new RuntimeException(exn);
+            }
+          };
+    } else {
+      cleanup = null;
+    }
+
     LOG.debug("Initial files to stage: " + filesToStage);
 
     PrepareJobRequest prepareJobRequest =
@@ -187,7 +215,7 @@ public PipelineResult run(Pipeline pipeline) {
       LOG.info("RunJobResponse: {}", runJobResponse);
       ByteString jobId = runJobResponse.getJobIdBytes();
 
-      return new JobServicePipelineResult(jobId, wrappedJobService.transfer());
+      return new JobServicePipelineResult(jobId, wrappedJobService.transfer(), 
cleanup);
     } catch (CloseException e) {
       throw new RuntimeException(e);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 176945)
    Time Spent: 4h 20m  (was: 4h 10m)

> Implement External environment for Portable Beam
> ------------------------------------------------
>
>                 Key: BEAM-6094
>                 URL: https://issues.apache.org/jira/browse/BEAM-6094
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to