[ 
https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290188#comment-16290188
 ] 

ASF GitHub Bot commented on BEAM-2899:
--------------------------------------

tgroh closed pull request #4188: [BEAM-2899] Implement most of JobApi#run
URL: https://github.com/apache/beam/pull/4188
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 8eb1e509de6..24e15e1b963 100644
--- a/pom.xml
+++ b/pom.xml
@@ -695,6 +695,12 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-reference-java</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-reference-job-orchestrator</artifactId>
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index 85d4da1a6fc..f3881649eda 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -114,5 +114,12 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
index e6a057b7bf5..2b0d17ec616 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -57,7 +57,6 @@ private GrpcFnServer(Server server, ServiceT service, 
ApiServiceDescriptor apiSe
     this.server = server;
     this.service = service;
     this.apiServiceDescriptor = apiServiceDescriptor;
-    server.start();
   }
 
   /**
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 9fe4a5fa07c..535de68922d 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -43,12 +43,15 @@ public Server allocatePortAndCreate(BindableService 
service, ApiServiceDescripto
       throws IOException {
     String name = String.format("InProcessServer_%s", 
serviceNameUniqifier.getAndIncrement());
     builder.setUrl(name);
-    return InProcessServerBuilder.forName(name).addService(service).build();
+    return 
InProcessServerBuilder.forName(name).addService(service).build().start();
   }
 
   @Override
-  public Server create(
-      BindableService service, ApiServiceDescriptor serviceDescriptor) throws 
IOException {
-    return 
InProcessServerBuilder.forName(serviceDescriptor.getUrl()).addService(service).build();
+  public Server create(BindableService service, ApiServiceDescriptor 
serviceDescriptor)
+      throws IOException {
+    return InProcessServerBuilder.forName(serviceDescriptor.getUrl())
+        .addService(service)
+        .build()
+        .start();
   }
 }
diff --git a/runners/reference/java/build.gradle 
b/runners/reference/java/build.gradle
new file mode 100644
index 00000000000..a256d32d44b
--- /dev/null
+++ b/runners/reference/java/build.gradle
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: project(":").file("build_rules.gradle")
+applyJavaNature()
+
+description = "Apache Beam :: Runners :: Reference :: Java"
+
+dependencies {
+  shadow project(path: ":beam-model-parent:beam-model-pipeline", 
configuration: "shadow")
+  shadow project(path: 
":beam-runners-parent:beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow library.java.slf4j_api
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.slf4j_jdk14
+}
+
+task packageTests(type: Jar) {
+  from sourceSets.test.output
+  classifier = "tests"
+}
+
+artifacts.archives packageTests
diff --git a/runners/reference/java/pom.xml b/runners/reference/java/pom.xml
new file mode 100644
index 00000000000..4b535f7f8c6
--- /dev/null
+++ b/runners/reference/java/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-reference-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-reference-java</artifactId>
+
+  <name>Apache Beam :: Runners :: Reference :: Java</name>
+  <description>
+    A Java implementation of the Beam Model which utilizes the portability
+    framework to execute user-definied functions.
+  </description>
+
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ReferenceRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ReferenceRunner.java
new file mode 100644
index 00000000000..ea80d0bfaa6
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/ReferenceRunner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.reference;
+
+import com.google.protobuf.Struct;
+import java.nio.file.Path;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+
+/**
+ * A {@code PipelineRunner} that executes a job via the Beam portability 
framework.
+ */
+public class ReferenceRunner {
+  public static void run(Pipeline p, Struct options, Path stagingLocation) 
throws Exception {
+    // Validate that the pipeline is well-formed.
+    PipelineTranslation.fromProto(p);
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/package-info.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/package-info.java
new file mode 100644
index 00000000000..d15cf635709
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for executing a pipeline locally over the Beam fn API.
+ */
+package org.apache.beam.runners.reference;
diff --git a/runners/reference/job-server/build.gradle 
b/runners/reference/job-server/build.gradle
index 4e959bfdca8..f5bc864e3cf 100644
--- a/runners/reference/job-server/build.gradle
+++ b/runners/reference/job-server/build.gradle
@@ -25,6 +25,7 @@ dependencies {
   shadow library.java.grpc_netty
   shadow project(path: ":beam-model-parent:beam-model-job-management", 
configuration: "shadow")
   shadow project(path: ":beam-runners-parent:beam-java-fn-execution", 
configuration: "shadow")
+  shadow project(path: 
":beam-runners-parent:beam-runners-reference-parent:beam-runners-reference-java",
 configuration: "shadow")
   shadow project(path: 
":beam-runners-parent:beam-local-artifact-service-java", configuration: 
"shadow")
   shadow library.java.grpc_core
   shadow library.java.grpc_stub
diff --git a/runners/reference/job-server/pom.xml 
b/runners/reference/job-server/pom.xml
index ae288c8a78e..3d77872a782 100644
--- a/runners/reference/job-server/pom.xml
+++ b/runners/reference/job-server/pom.xml
@@ -62,6 +62,11 @@
       <artifactId>beam-local-artifact-service-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-reference-java</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
diff --git 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/PreparingJob.java
 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/PreparingJob.java
index 80505cafecf..5d441fed0df 100644
--- 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/PreparingJob.java
+++ 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/PreparingJob.java
@@ -20,6 +20,7 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.protobuf.Struct;
+import java.nio.file.Path;
 import org.apache.beam.artifact.local.LocalFileSystemArtifactStagerService;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -33,6 +34,7 @@ public static Builder builder() {
 
   abstract Pipeline getPipeline();
   abstract Struct getOptions();
+  abstract Path getStagingLocation();
   abstract GrpcFnServer<LocalFileSystemArtifactStagerService> 
getArtifactStagingServer();
 
   @Override
@@ -46,6 +48,8 @@ public void close() throws Exception {
 
     abstract Builder setOptions(Struct options);
 
+    abstract Builder setStagingLocation(Path stagingLocation);
+
     abstract Builder setArtifactStagingServer(
         GrpcFnServer<LocalFileSystemArtifactStagerService> server);
 
diff --git 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
index 88afab354f9..4a5d7e8024e 100644
--- 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
+++ 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
@@ -22,6 +22,7 @@
 
 import com.google.common.collect.ImmutableList;
 import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -38,10 +39,12 @@
 import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
 import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceImplBase;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.reference.ReferenceRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +81,9 @@ public void prepare(
       LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request);
 
       String preparationId = request.getJobName() + 
ThreadLocalRandom.current().nextInt();
+      Path tempDir = Files.createTempDirectory("reference-runner-staging");
       GrpcFnServer<LocalFileSystemArtifactStagerService> 
artifactStagingService =
-          createArtifactStagingService(preparationId);
+          createArtifactStagingService();
       PreparingJob previous =
           unpreparedJobs.putIfAbsent(
               preparationId,
@@ -87,6 +91,7 @@ public void prepare(
                   .setArtifactStagingServer(artifactStagingService)
                   .setPipeline(request.getPipeline())
                   .setOptions(request.getPipelineOptions())
+                  .setStagingLocation(tempDir)
                   .build());
       checkArgument(
           previous == null, "Unexpected existing job with preparation ID %s", 
preparationId);
@@ -103,8 +108,8 @@ public void prepare(
     }
   }
 
-  private GrpcFnServer<LocalFileSystemArtifactStagerService> 
createArtifactStagingService(
-      String preparationId) throws Exception {
+  private GrpcFnServer<LocalFileSystemArtifactStagerService> 
createArtifactStagingService()
+      throws Exception {
     LocalFileSystemArtifactStagerService service =
         
LocalFileSystemArtifactStagerService.withRootDirectory(stagingPathSupplier.call().toFile());
     return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
@@ -113,9 +118,34 @@ public void prepare(
   @Override
   public void run(
       JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse> 
responseObserver) {
-    LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request);
-    System.err.println("Run Job Blah");
-    responseObserver.onError(Status.UNIMPLEMENTED.asException());
+    try {
+      LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request);
+      String preparationId = request.getPreparationId();
+      PreparingJob preparingJob = unpreparedJobs.get(preparationId);
+      if (preparingJob == null) {
+        responseObserver.onError(
+            Status.INVALID_ARGUMENT
+                .withDescription(String.format("Unknown Preparation Id %s", 
preparationId))
+                .asException());
+        return;
+      }
+      try {
+        // Close any preparation-time only resources.
+        preparingJob.close();
+      } catch (Exception e) {
+        responseObserver.onError(e);
+      }
+      // TODO: Return a real java 'job handle'; this gets used in getState, 
cancel, etc
+      ReferenceRunner.run(
+          preparingJob.getPipeline(), preparingJob.getOptions(), 
preparingJob.getStagingLocation());
+      String jobId = preparingJob + 
Integer.toString(ThreadLocalRandom.current().nextInt());
+      
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
+      responseObserver.onCompleted();
+    } catch (StatusRuntimeException e) {
+      responseObserver.onError(e);
+    } catch (Exception e) {
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
   }
 
   @Override
diff --git a/runners/reference/pom.xml b/runners/reference/pom.xml
index 0c7f93922fd..66922f32080 100644
--- a/runners/reference/pom.xml
+++ b/runners/reference/pom.xml
@@ -34,6 +34,7 @@
 
   <packaging>pom</packaging>
   <modules>
+    <module>java</module>
     <module>job-server</module>
   </modules>
 </project>
diff --git a/settings.gradle b/settings.gradle
index 7eb25aac362..1ed6564cbca 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -76,6 +76,7 @@ include ':beam-runners-parent:beam-java-fn-execution'
 include ':beam-runners-parent:beam-runners-core-construction-java'
 include ':beam-runners-parent:beam-runners-core-java'
 include ':beam-runners-parent:beam-local-artifact-service-java'
+include 
':beam-runners-parent:beam-runners-reference-parent:beam-runners-reference-java'
 include 
':beam-runners-parent:beam-runners-reference-parent:beam-runners-reference-job-orchestrator'
 include ':beam-runners-parent:beam-runners-reference-parent'
 include ':beam-runners-parent:beam-runners-direct-java'
@@ -153,6 +154,7 @@ 
project(':beam-runners-parent:beam-java-fn-execution').projectDir = "$rootDir/ru
 project(':beam-runners-parent:beam-runners-core-construction-java').projectDir 
= "$rootDir/runners/core-construction-java" as File
 project(':beam-runners-parent:beam-runners-core-java').projectDir = 
"$rootDir/runners/core-java" as File
 project(':beam-runners-parent:beam-local-artifact-service-java').projectDir = 
"$rootDir/runners/local-artifact-service-java" as File
+project(':beam-runners-parent:beam-runners-reference-parent:beam-runners-reference-java').projectDir
 = "$rootDir/runners/reference/java" as File
 
project(':beam-runners-parent:beam-runners-reference-parent:beam-runners-reference-job-orchestrator').projectDir
 = "$rootDir/runners/reference/job-server" as File
 project(':beam-runners-parent:beam-runners-reference-parent').projectDir = 
"$rootDir/runners/reference" as File
 project(':beam-runners-parent:beam-runners-direct-java').projectDir = 
"$rootDir/runners/direct-java" as File


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> To make the portability effort tractable, we should implement a Universal 
> Local Runner (ULR) in Java that runs in a single server process plus docker 
> containers for the SDK harness containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature 
> should be implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the 
> portability framework. It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime 
> environment. For example, a DoFn that shells out has a dependency that may be 
> satisfied on the user's desktop (and thus works fine on the direct runner), 
> but perhaps not by the container harness image. The ULR allows for an easy 
> way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to