[ 
https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=134318&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134318
 ]

ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Aug/18 22:30
            Start Date: 13/Aug/18 22:30
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6073: [BEAM-4176] Validate 
Runner Tests generalization and enable for local reference runner
URL: https://github.com/apache/beam/pull/6073
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bc22bad8abd..37bb2457a17 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -19,9 +19,11 @@
 package org.apache.beam.gradle
 
 import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+import groovy.json.JsonOutput
 import org.gradle.api.GradleException
 import org.gradle.api.Plugin
 import org.gradle.api.Project
+import org.gradle.api.artifacts.Configuration
 import org.gradle.api.file.FileTree
 import org.gradle.api.plugins.quality.Checkstyle
 import org.gradle.api.plugins.quality.FindBugs
@@ -193,6 +195,30 @@ class BeamModulePlugin implements Plugin<Project> {
     String tag = null // Sets the image tag (optional).
   }
 
+  // A class defining the configuration for PortableValidatesRunner.
+  class PortableValidatesRunnerConfiguration {
+    // Task name for validate runner case.
+    String name = 'validatesPortableRunner'
+    // Fully qualified JobServerClass name to use.
+    String jobServerDriver
+    // A string representing the jobServer Configuration.
+    String jobServerConfig
+    // Categories for tests to run.
+    Closure testCategories = {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+    }
+    // Configuration for the classpath when running the test.
+    Configuration testClasspathConfiguration
+  }
+
   def isRelease(Project project) {
     return project.hasProperty('isRelease')
   }
@@ -1392,5 +1418,38 @@ artifactId=${project.name}
         args argsNeeded
       }
     }
+
+
+    /** 
***********************************************************************************************/
+
+    // Method to create the PortableValidatesRunnerTask.
+    // The method takes PortableValidatesRunnerConfiguration as parameter.
+    project.ext.createPortableValidatesRunnerTask = {
+      /*
+       * We need to rely on manually specifying these evaluationDependsOn to 
ensure that
+       * the following projects are evaluated before we evaluate this project. 
This is because
+       * we are attempting to reference the "sourceSets.test.output" directly.
+       */
+      project.evaluationDependsOn(":beam-sdks-java-core")
+      project.evaluationDependsOn(":beam-runners-core-java")
+      def config = it ? it as PortableValidatesRunnerConfiguration : new 
PortableValidatesRunnerConfiguration()
+      def name = config.name
+      def beamTestPipelineOptions = [
+        
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
+        "--jobServerDriver=${config.jobServerDriver}",
+      ]
+      if (config.jobServerConfig) {
+        
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
+      }
+      project.tasks.create(name: name, type: Test) {
+        group = "Verification"
+        description = "Validates the PortableRunner with JobServer 
${config.jobServerDriver}"
+        systemProperty "beamTestPipelineOptions", 
JsonOutput.toJson(beamTestPipelineOptions)
+        classpath = config.testClasspathConfiguration
+        testClassesDirs = 
project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs,
 project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
+        maxParallelForks 1
+        useJUnit(config.testCategories)
+      }
+    }
   }
 }
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index dd5da90040d..b13426a1f3a 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -54,6 +54,7 @@ evaluationDependsOn(":beam-sdks-java-core")
 configurations {
   needsRunner
   validatesRunner
+  validatesPortableRunner
 }
 
 dependencies {
@@ -88,6 +89,13 @@ dependencies {
   validatesRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
   validatesRunner project(path: project.path, configuration: "shadow")
   validatesRunner project(path: project.path, configuration: "shadowTest")
+  validatesPortableRunner project(path: 
":beam-runners-core-construction-java", configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-core-java", 
configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-java-fn-execution", 
configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-reference-java", 
configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  validatesPortableRunner project(path: project.path, configuration: "shadow")
+  validatesPortableRunner project(path: project.path, configuration: 
"shadowTest")
 }
 
 task needsRunnerTests(type: Test) {
@@ -147,3 +155,5 @@ createJavaExamplesArchetypeValidationTask(type: 
'MobileGaming',
   gcsBucket: gcsBucket,
   bqDataset: bqDataset,
   pubsubTopic: pubsubTopic)
+
+createPortableValidatesRunnerTask(jobServerDriver: 
"org.apache.beam.runners.direct.portable.job.ReferenceRunnerJobServer", 
testClasspathConfiguration: configurations.validatesPortableRunner)
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
index 1ef3acb6c0e..207deeff9f3 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
@@ -18,17 +18,34 @@
 package org.apache.beam.runners.direct.portable.job;
 
 import java.io.IOException;
+import java.util.Arrays;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A program that runs a {@link ReferenceRunnerJobService}. */
 public class ReferenceRunnerJobServer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReferenceRunnerJobServer.class);
+  private final ServerConfiguration configuration;
+  private GrpcFnServer<ReferenceRunnerJobService> server;
+
+  private ReferenceRunnerJobServer(ServerConfiguration configuration) {
+    this.configuration = configuration;
+  }
 
   public static void main(String[] args) throws Exception {
+    try {
+      runServer(parseConfiguration(args));
+    } catch (CmdLineException ignored) {
+    }
+  }
+
+  private static ServerConfiguration parseConfiguration(String[] args) throws 
CmdLineException {
     ServerConfiguration configuration = new ServerConfiguration();
     CmdLineParser parser = new CmdLineParser(configuration);
     try {
@@ -36,9 +53,9 @@ public static void main(String[] args) throws Exception {
     } catch (CmdLineException e) {
       e.printStackTrace(System.err);
       printUsage(parser);
-      return;
+      throw e;
     }
-    runServer(configuration);
+    return configuration;
   }
 
   private static void printUsage(CmdLineParser parser) {
@@ -64,6 +81,33 @@ private static void runServer(ServerConfiguration 
configuration) throws Exceptio
     System.out.println("Server shut down, exiting");
   }
 
+  public static ReferenceRunnerJobServer fromParams(String[] args) {
+    try {
+      return new ReferenceRunnerJobServer(parseConfiguration(args));
+    } catch (CmdLineException e) {
+      throw new IllegalArgumentException(
+          "Unable to parse command line arguments " + Arrays.asList(args), e);
+    }
+  }
+
+  public String start() throws Exception {
+    ServerFactory serverFactory = ServerFactory.createDefault();
+    server =
+        createServer(configuration, serverFactory, 
ReferenceRunnerJobService.create(serverFactory));
+
+    return server.getApiServiceDescriptor().getUrl();
+  }
+
+  public void stop() {
+    if (server != null) {
+      try {
+        server.close();
+      } catch (Exception e) {
+        LOG.error("Unable to stop job server.", e);
+      }
+    }
+  }
+
   private static GrpcFnServer<ReferenceRunnerJobService> createServer(
       ServerConfiguration configuration,
       ServerFactory serverFactory,
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index 6049b63526e..9627152c100 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -26,6 +26,7 @@
 import java.nio.file.Path;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,6 +36,8 @@
 import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
@@ -55,6 +58,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(ReferenceRunnerJobService.class);
 
   public static ReferenceRunnerJobService create(final ServerFactory 
serverFactory) {
+    LOG.info("Starting {}", ReferenceRunnerJobService.class);
     return new ReferenceRunnerJobService(
         serverFactory, () -> 
Files.createTempDirectory("reference-runner-staging"));
   }
@@ -64,7 +68,10 @@ public static ReferenceRunnerJobService create(final 
ServerFactory serverFactory
 
   private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
   private final ConcurrentMap<String, ReferenceRunner> runningJobs;
+  private final ConcurrentMap<String, JobState.Enum> jobStates;
   private final ExecutorService executor;
+  private final 
ConcurrentLinkedQueue<GrpcFnServer<LocalFileSystemArtifactStagerService>>
+      artifactStagingServices;
 
   private ReferenceRunnerJobService(
       ServerFactory serverFactory, Callable<Path> stagingPathCallable) {
@@ -72,12 +79,14 @@ private ReferenceRunnerJobService(
     this.stagingPathCallable = stagingPathCallable;
     unpreparedJobs = new ConcurrentHashMap<>();
     runningJobs = new ConcurrentHashMap<>();
+    jobStates = new ConcurrentHashMap<>();
     executor =
         Executors.newCachedThreadPool(
             new ThreadFactoryBuilder()
                 .setDaemon(false)
                 .setNameFormat("reference-runner-pipeline-%s")
                 .build());
+    artifactStagingServices = new ConcurrentLinkedQueue<>();
   }
 
   public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> 
supplier) {
@@ -95,6 +104,7 @@ public void prepare(
       Path tempDir = stagingPathCallable.call();
       GrpcFnServer<LocalFileSystemArtifactStagerService> 
artifactStagingService =
           createArtifactStagingService(tempDir);
+      artifactStagingServices.add(artifactStagingService);
       PreparingJob previous =
           unpreparedJobs.putIfAbsent(
               preparationId,
@@ -156,14 +166,22 @@ public void run(
               preparingJob.getPipeline(),
               preparingJob.getOptions(),
               preparingJob.getStagingLocation().toFile());
-      String jobId = preparingJob + 
Integer.toString(ThreadLocalRandom.current().nextInt());
+      String jobId = "job-" + 
Integer.toString(ThreadLocalRandom.current().nextInt());
       
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
       responseObserver.onCompleted();
       runningJobs.put(jobId, runner);
+      jobStates.putIfAbsent(jobId, Enum.RUNNING);
       executor.submit(
           () -> {
-            runner.execute();
-            return null;
+            try {
+              jobStates.computeIfPresent(jobId, (id, status) -> Enum.RUNNING);
+              runner.execute();
+              jobStates.computeIfPresent(jobId, (id, status) -> Enum.DONE);
+              return null;
+            } catch (Exception e) {
+              jobStates.computeIfPresent(jobId, (id, status) -> Enum.FAILED);
+              throw e;
+            }
           });
     } catch (StatusRuntimeException e) {
       responseObserver.onError(e);
@@ -176,10 +194,11 @@ public void run(
   public void getState(
       GetJobStateRequest request, StreamObserver<GetJobStateResponse> 
responseObserver) {
     LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
-    responseObserver.onError(
-        Status.NOT_FOUND
-            .withDescription(String.format("Unknown Job ID %s", 
request.getJobId()))
-            .asException());
+    responseObserver.onNext(
+        GetJobStateResponse.newBuilder()
+            .setState(jobStates.getOrDefault(request.getJobId(), 
Enum.UNRECOGNIZED))
+            .build());
+    responseObserver.onCompleted();
   }
 
   @Override
@@ -200,5 +219,16 @@ public void close() throws Exception {
         LOG.warn("Exception while closing preparing job {}", preparingJob);
       }
     }
+    while (!artifactStagingServices.isEmpty()) {
+      GrpcFnServer<LocalFileSystemArtifactStagerService> 
artifactStagingService =
+          artifactStagingServices.remove();
+      try {
+        artifactStagingService.close();
+      } catch (Exception e) {
+        LOG.error(
+            "Unable to close staging sevice started on %s",
+            artifactStagingService.getApiServiceDescriptor().getUrl(), e);
+      }
+    }
   }
 }
diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index aa56a475935..4059d7303fb 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -25,14 +25,6 @@ applyJavaNature(
   },
 )
 
-/*
- * We need to rely on manually specifying these evaluationDependsOn to ensure 
that
- * the following projects are evaluated before we evaluate this project. This 
is because
- * we are attempting to reference the "sourceSets.test.output" directly.
- */
-evaluationDependsOn(":beam-sdks-java-core")
-evaluationDependsOn(":beam-runners-core-java")
-
 description = "Apache Beam :: Runners :: Flink :: Job Server"
 
 apply plugin: "application"
@@ -40,15 +32,15 @@ apply plugin: "application"
 mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
 
 configurations {
-  validatesRunner
+  validatesPortableRunner
 }
 
 dependencies {
   compile project(path: ":beam-runners-flink_2.11", configuration: "shadow")
-  validatesRunner project(path: ":beam-runners-flink_2.11", configuration: 
"shadowTest")
-  validatesRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
-  validatesRunner project(path: ":beam-runners-core-java", configuration: 
"shadowTest")
-  validatesRunner project(path: ":beam-runners-reference-java", configuration: 
"shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-flink_2.11", 
configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-core-java", 
configuration: "shadowTest")
+  validatesPortableRunner project(path: ":beam-runners-reference-java", 
configuration: "shadowTest")
   compile project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
 //  TODO: Enable AWS and HDFS file system.
 }
@@ -68,58 +60,4 @@ runShadow {
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
 }
 
-class PortableValidatesRunnerConfig {
-  // Task name for validate runner case.
-  String name
-  // Fully qualified JobServerClass name to use.
-  String jobServerDriver
-  // A string representing the jobServer Configuration.
-  String jobServerConfig
-  // Flag to include tests for streaming or batch.
-  boolean streaming
-}
-
-def createPortableValidatesRunnerTask = {
-  def config = it ? it as PortableValidatesRunnerConfig : new 
PortableValidatesRunnerConfig()
-  tasks.create(name: config.name, type: Test) {
-    group = "Verification"
-    description = "Validates the PortableRunner with JobServer 
${config.jobServerDriver}"
-    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
-            
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
-            "--jobServerDriver=${config.jobServerDriver}",
-            config.jobServerConfig ? 
"--jobServerConfig=${config.jobServerConfig}" : "",
-    ])
-    classpath = configurations.validatesRunner
-    testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, 
project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
-    maxParallelForks 1
-    if (config.streaming) {
-      useJUnit {
-        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      }
-    } else {
-      useJUnit {
-        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
-        excludeCategories 
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      }
-    }
-  }
-}
-
-createPortableValidatesRunnerTask(name: "validatesPortableRunner", 
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", 
jobServerConfig: "", streaming: false)
-
-task validatesRunner {
-  group = "Verification"
-  description "Validates Portable Flink runner"
-  dependsOn validatesPortableRunner
-}
+createPortableValidatesRunnerTask(jobServerDriver: 
"org.apache.beam.runners.flink.FlinkJobServerDriver", 
testClasspathConfiguration: configurations.validatesPortableRunner)
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
index 9431ff87aff..974ab1f4376 100644
--- 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
@@ -213,8 +214,7 @@ private static StagedFile createStagingFile(File file) {
     // generally accept arbitrary artifact names.
     // NOTE: Base64 url encoding does not work here because the stage artifact 
names tend to be long
     // and exceed file length limits on the artifact stager.
-    String encodedPath = escapePath(file.getPath());
-    return StagedFile.of(file, encodedPath);
+    return StagedFile.of(file, UUID.randomUUID().toString());
   }
 
   /** Create a filename-friendly artifact name for the given path. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134318)
    Time Spent: 14h 10m  (was: 14h)

> Java: Portable batch runner passes all ValidatesRunner tests that 
> non-portable runner passes
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4176
>                 URL: https://issues.apache.org/jira/browse/BEAM-4176
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Priority: Major
>          Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to