[ 
https://issues.apache.org/jira/browse/BEAM-2885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16277751#comment-16277751
 ] 

ASF GitHub Bot commented on BEAM-2885:
--------------------------------------

tgroh closed pull request #4187: [BEAM-2885] Verify that the exposed endpoint 
stages artifacts
URL: https://github.com/apache/beam/pull/4187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/reference/job-server/build.gradle 
b/runners/reference/job-server/build.gradle
index 1eaee9bf29f..4e959bfdca8 100644
--- a/runners/reference/job-server/build.gradle
+++ b/runners/reference/job-server/build.gradle
@@ -32,6 +32,7 @@ dependencies {
   shadow library.java.args4j
   testCompile library.java.junit
   shadowTest project(path: 
":beam-runners-parent:beam-runners-core-construction-java", configuration: 
"shadow")
+  shadowTest library.java.hamcrest_core
   shadowTest library.java.slf4j_jdk14
 }
 
diff --git a/runners/reference/job-server/pom.xml 
b/runners/reference/job-server/pom.xml
index 184b959fe95..ae288c8a78e 100644
--- a/runners/reference/job-server/pom.xml
+++ b/runners/reference/job-server/pom.xml
@@ -122,5 +122,18 @@
       <artifactId>slf4j-jdk14</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <!-- Stops the Maven Dependency Plugin from failing -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
index 33ccbd7b8d8..88afab354f9 100644
--- 
a/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
+++ 
b/runners/reference/job-server/src/main/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobService.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
@@ -48,18 +49,27 @@
 public class ReferenceRunnerJobService extends JobServiceImplBase implements 
FnService {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReferenceRunnerJobService.class);
 
-  public static ReferenceRunnerJobService create(ServerFactory serverFactory) {
-    return new ReferenceRunnerJobService(serverFactory);
+  public static ReferenceRunnerJobService create(final ServerFactory 
serverFactory) {
+    return new ReferenceRunnerJobService(
+        serverFactory, filesTempDirectory());
   }
 
   private final ServerFactory serverFactory;
+  private final Callable<Path> stagingPathSupplier;
+
   private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
 
-  private ReferenceRunnerJobService(ServerFactory serverFactory) {
+  private ReferenceRunnerJobService(
+      ServerFactory serverFactory, Callable<Path> stagingPathSupplier) {
     this.serverFactory = serverFactory;
+    this.stagingPathSupplier = stagingPathSupplier;
     unpreparedJobs = new ConcurrentHashMap<>();
   }
 
+  public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> 
supplier) {
+    return new ReferenceRunnerJobService(serverFactory, supplier);
+  }
+
   @Override
   public void prepare(
       JobApi.PrepareJobRequest request,
@@ -94,13 +104,10 @@ public void prepare(
   }
 
   private GrpcFnServer<LocalFileSystemArtifactStagerService> 
createArtifactStagingService(
-      String preparationId) throws IOException {
-    Path tempDir = Files.createTempDirectory("reference-runner-staging");
+      String preparationId) throws Exception {
     LocalFileSystemArtifactStagerService service =
-        
LocalFileSystemArtifactStagerService.withRootDirectory(tempDir.toFile());
-    GrpcFnServer<LocalFileSystemArtifactStagerService> server =
-        GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
-    return server;
+        
LocalFileSystemArtifactStagerService.withRootDirectory(stagingPathSupplier.call().toFile());
+    return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
   }
 
   @Override
@@ -140,4 +147,12 @@ public void close() throws Exception {
       }
     }
   }
+
+  private static Callable<Path> filesTempDirectory() {
+    return new Callable<Path>() {
+      public Path call() throws IOException {
+        return Files.createTempDirectory("reference-runner-staging");
+      }
+    };
+  }
 }
diff --git 
a/runners/reference/job-server/src/test/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobServiceTest.java
 
b/runners/reference/job-server/src/test/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobServiceTest.java
index 62be932fc53..2c63615d110 100644
--- 
a/runners/reference/job-server/src/test/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobServiceTest.java
+++ 
b/runners/reference/job-server/src/test/java/org/apache/beam/runners/reference/job/ReferenceRunnerJobServiceTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.runners.reference.job;
 
+import static org.hamcrest.Matchers.hasItems;
+import static org.junit.Assert.assertThat;
+
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Struct;
 import io.grpc.inprocess.InProcessChannelBuilder;
@@ -25,6 +28,13 @@
 import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
@@ -34,6 +44,9 @@
 import org.apache.beam.runners.core.construction.ArtifactServiceStager;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -47,7 +60,8 @@
  */
 @RunWith(JUnit4.class)
 public class ReferenceRunnerJobServiceTest {
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @Rule public TemporaryFolder runnerTemp = new TemporaryFolder();
+  @Rule public TemporaryFolder clientTemp = new TemporaryFolder();
 
   private InProcessServerFactory serverFactory = 
InProcessServerFactory.create();
   private ReferenceRunnerJobService service;
@@ -56,7 +70,15 @@
 
   @Before
   public void setup() throws Exception {
-    service = ReferenceRunnerJobService.create(serverFactory);
+    service =
+        ReferenceRunnerJobService.create(serverFactory)
+            .withStagingPathSupplier(
+                new Callable<Path>() {
+                  @Override
+                  public Path call() throws Exception {
+                    return runnerTemp.getRoot().toPath();
+                  }
+                });
     server = GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
     stub =
         JobServiceGrpc.newBlockingStub(
@@ -85,12 +107,43 @@ public void testPrepareJob() throws Exception {
     File foo = writeTempFile("foo", "foo, bar, baz".getBytes());
     File bar = writeTempFile("spam", "spam, ham, eggs".getBytes());
     stager.stage(ImmutableList.<File>of(foo, bar));
+    List<byte[]> tempDirFiles = readFlattendFiles(runnerTemp.getRoot());
+    assertThat(
+        tempDirFiles,
+        hasItems(
+            arrayEquals(Files.readAllBytes(foo.toPath())),
+            arrayEquals(Files.readAllBytes(bar.toPath()))));
     // TODO: 'run' the job with some sort of noop backend, to verify state is 
cleaned up.
-    // TODO: Verify that the artifacts have been staged
+  }
+
+  private Matcher<byte[]> arrayEquals(final byte[] expected) {
+    return new TypeSafeMatcher<byte[]>() {
+      @Override
+      protected boolean matchesSafely(byte[] actual) {
+        return Arrays.equals(actual, expected);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("an array equal to 
").appendValue(Arrays.toString(expected));
+      }
+    };
+  }
+
+  private List<byte[]> readFlattendFiles(File root) throws Exception {
+    if (root.isDirectory()) {
+      List<byte[]> children = new ArrayList<>();
+      for (File child : root.listFiles()) {
+        children.addAll(readFlattendFiles(child));
+      }
+      return children;
+    } else {
+      return Collections.singletonList(Files.readAllBytes(root.toPath()));
+    }
   }
 
   private File writeTempFile(String fileName, byte[] contents) throws 
Exception {
-    File file = temp.newFile(fileName);
+    File file = clientTemp.newFile(fileName);
     try (FileOutputStream stream = new FileOutputStream(file);
         FileChannel channel = stream.getChannel()) {
       channel.write(ByteBuffer.wrap(contents));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support job+artifact APIs locally
> ---------------------------------
>
>                 Key: BEAM-2885
>                 URL: https://issues.apache.org/jira/browse/BEAM-2885
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-dataflow
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> As per https://s.apache.org/beam-job-api, use local support for 
> submission-side. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to