[ 
https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=320909&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320909
 ]

ASF GitHub Bot logged work on BEAM-8312:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Sep/19 22:09
            Start Date: 30/Sep/19 22:09
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #9681: [BEAM-8312] 
ArtifactRetrievalService serving artifacts from jar.
URL: https://github.com/apache/beam/pull/9681#discussion_r329808387
 
 

 ##########
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/ClassLoaderArtifactServiceTest.java
 ##########
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.Charset;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ClassLoaderArtifactRetrievalService} and {@link
+ * JavaFilesystemArtifactStagingService}.
+ */
+@RunWith(JUnit4.class)
+public class ClassLoaderArtifactServiceTest {
+
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private static final int ARTIFACT_CHUNK_SIZE = 100;
+
+  private static final Charset BIJECTIVE_CHARSET = Charsets.ISO_8859_1;
+
+  public interface ArtifactServicePair extends AutoCloseable {
+
+    public String getStagingToken(String nonce);
+
+    public ArtifactStagingServiceGrpc.ArtifactStagingServiceStub 
createStagingStub()
+        throws Exception;
+
+    public ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub 
createStagingBlockingStub()
+        throws Exception;
+
+    public ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub 
createRetrievalStub()
+        throws Exception;
+
+    public ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub
+        createRetrievalBlockingStub() throws Exception;
+  }
+
+  private ArtifactServicePair classLoaderService() throws IOException {
+    return new ArtifactServicePair() {
+
+      JavaFilesystemArtifactStagingService stagingService;
+      GrpcFnServer<JavaFilesystemArtifactStagingService> stagingServer;
+      ClassLoaderArtifactRetrievalService retrievalService;
+      GrpcFnServer<ClassLoaderArtifactRetrievalService> retrievalServer;
+
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub;
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub 
stagingBlockingStub;
+      ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub;
+      ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub 
retrievalBlockingStub;
+
+      Path jarPath = Paths.get(tempFolder.newFile("jar.jar").getPath());
+      FileSystem jarFilesystem;
+
+      @Override
+      public void close() throws Exception {
+        if (stagingServer != null) {
+          stagingServer.close();
+        }
+        if (stagingService != null) {
+          stagingService.close();
+        }
+        if (retrievalServer != null) {
+          retrievalServer.close();
+        }
+        if (retrievalService != null) {
+          retrievalService.close();
+        }
+      }
+
+      @Override
+      public String getStagingToken(String nonce) {
+        return "/path/to/subdir" + nonce.hashCode();
+      }
+
+      private void startStagingService() throws Exception {
+        try (FileOutputStream fileOut = new 
FileOutputStream(jarPath.toString())) {
+          try (ZipOutputStream zipOut = new ZipOutputStream(fileOut)) {
+            ZipEntry zipEntry = new ZipEntry("someFile");
+            zipOut.putNextEntry(zipEntry);
+            zipOut.write(new byte[] {'s', 't', 'u', 'f', 'f'});
+            zipOut.closeEntry();
+          }
+        }
+        jarFilesystem =
+            FileSystems.newFileSystem(
+                URI.create("jar:file:" + jarPath.toString()), 
ImmutableMap.of());
+        JavaFilesystemArtifactStagingService stagingService =
+            new JavaFilesystemArtifactStagingService(jarFilesystem, 
"/path/to/root");
+        GrpcFnServer<JavaFilesystemArtifactStagingService> stagingServer =
+            GrpcFnServer.allocatePortAndCreateFor(stagingService, 
InProcessServerFactory.create());
+        ManagedChannel stagingChannel =
+            
InProcessChannelBuilder.forName(stagingServer.getApiServiceDescriptor().getUrl())
+                .build();
+        stagingStub = ArtifactStagingServiceGrpc.newStub(stagingChannel);
+        stagingBlockingStub = 
ArtifactStagingServiceGrpc.newBlockingStub(stagingChannel);
+      }
+
+      @Override
+      public ArtifactStagingServiceGrpc.ArtifactStagingServiceStub 
createStagingStub()
+          throws Exception {
+        if (stagingStub == null) {
+          startStagingService();
+        }
+        return stagingStub;
+      }
+
+      @Override
+      public ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub
+          createStagingBlockingStub() throws Exception {
+        if (stagingBlockingStub == null) {
+          startStagingService();
+        }
+        return stagingBlockingStub;
+      }
+
+      public void startupRetrievalService() throws Exception {
+        jarFilesystem.close();
+        System.err.println("jarPath.toUri().toURL() " + 
jarPath.toUri().toURL());
+        URLClassLoader loader = new URLClassLoader(new URL[] 
{jarPath.toUri().toURL()});
+        System.err.println(loader.findResource("someFile"));
+        System.err.println(loader.findResource("/someFile"));
+        
System.err.println(loader.findResource("path/to/subdir105002991/MANIFEST.json"));
+        
System.err.println(loader.getResource("path/to/subdir105002991/MANIFEST.json"));
+        System.err.println(Arrays.toString(loader.getURLs()));
+        System.err.println(loader.getParent());
+        retrievalService =
+            new ClassLoaderArtifactRetrievalService(
+                new URLClassLoader(new URL[] {jarPath.toUri().toURL()}));
+        retrievalServer =
+            GrpcFnServer.allocatePortAndCreateFor(
+                retrievalService, InProcessServerFactory.create());
+        ManagedChannel retrievalChannel =
+            
InProcessChannelBuilder.forName(retrievalServer.getApiServiceDescriptor().getUrl())
+                .build();
+        retrievalStub = ArtifactRetrievalServiceGrpc.newStub(retrievalChannel);
+        retrievalBlockingStub = 
ArtifactRetrievalServiceGrpc.newBlockingStub(retrievalChannel);
+      }
+
+      @Override
+      public ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub 
createRetrievalStub()
+          throws Exception {
+        if (retrievalStub == null) {
+          startupRetrievalService();
+        }
+        return retrievalStub;
+      }
+
+      @Override
+      public ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub
+          createRetrievalBlockingStub() throws Exception {
+        if (retrievalBlockingStub == null) {
+          startupRetrievalService();
+        }
+        return retrievalBlockingStub;
+      }
+    };
+  }
+
+  private ArtifactApi.ArtifactMetadata putArtifact(
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub,
+      String stagingSessionToken,
+      String name,
+      String contents)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    ArtifactApi.ArtifactMetadata metadata =
+        ArtifactApi.ArtifactMetadata.newBuilder().setName(name).build();
+    CompletableFuture<Void> complete = new CompletableFuture<>();
+    StreamObserver<ArtifactApi.PutArtifactRequest> outputStreamObserver =
+        stagingStub.putArtifact(
+            new StreamObserver<ArtifactApi.PutArtifactResponse>() {
+
+              @Override
+              public void onNext(ArtifactApi.PutArtifactResponse 
putArtifactResponse) {
+                // Do nothing.
+              }
+
+              @Override
+              public void onError(Throwable th) {
+                complete.completeExceptionally(th);
+              }
+
+              @Override
+              public void onCompleted() {
+                complete.complete(null);
+              }
+            });
+    outputStreamObserver.onNext(
+        ArtifactApi.PutArtifactRequest.newBuilder()
+            .setMetadata(
+                ArtifactApi.PutArtifactMetadata.newBuilder()
+                    .setMetadata(metadata)
+                    .setStagingSessionToken(stagingSessionToken))
+            .build());
+
+    byte[] byteContents = contents.getBytes(BIJECTIVE_CHARSET);
+    for (int start = 0; start < byteContents.length; start += 
ARTIFACT_CHUNK_SIZE) {
+      System.err.println("WRITING BYTES " + start);
+      outputStreamObserver.onNext(
+          ArtifactApi.PutArtifactRequest.newBuilder()
+              .setData(
+                  ArtifactApi.ArtifactChunk.newBuilder()
+                      .setData(
+                          ByteString.copyFrom(
+                              byteContents,
+                              start,
+                              Math.min(byteContents.length - start, 
ARTIFACT_CHUNK_SIZE)))
+                      .build())
+              .build());
+    }
+    System.err.println("COMPLETED");
+    outputStreamObserver.onCompleted();
+    complete.get(10, TimeUnit.SECONDS);
+    return metadata;
+  }
+
+  private String commitManifest(
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub 
stagingStub,
+      String stagingToken,
+      List<ArtifactApi.ArtifactMetadata> artifacts) {
+    return stagingStub
+        .commitManifest(
+            ArtifactApi.CommitManifestRequest.newBuilder()
+                .setStagingSessionToken(stagingToken)
+                
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts))
+                .build())
+        .getRetrievalToken();
+  }
+
+  private String getArtifact(
+      ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub,
+      String retrievalToken,
+      String name)
+      throws ExecutionException, InterruptedException {
+    CompletableFuture<String> result = new CompletableFuture<>();
+    retrievalStub.getArtifact(
+        ArtifactApi.GetArtifactRequest.newBuilder()
+            .setRetrievalToken(retrievalToken)
+            .setName(name)
+            .build(),
+        new StreamObserver<ArtifactApi.ArtifactChunk>() {
+
+          private ByteArrayOutputStream all = new ByteArrayOutputStream();
+
+          @Override
+          public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
+            try {
+              System.err.println("GOT " + artifactChunk.getData());
+              all.write(artifactChunk.getData().toByteArray());
+            } catch (IOException exn) {
+              Assert.fail("ByteArrayOutputStream threw exception: " + exn);
+            }
+          }
+
+          @Override
+          public void onError(Throwable th) {
+            result.completeExceptionally(th);
+          }
+
+          @Override
+          public void onCompleted() {
+            result.complete(new String(all.toByteArray(), BIJECTIVE_CHARSET));
+          }
+        });
+    return result.get();
+  }
+
+  private String stageArtifacts(
+      ArtifactServicePair service, String stagingToken, Map<String, String> 
artifacts)
+      throws Exception {
+    ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub = 
service.createStagingStub();
+    ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub 
stagingBlockingStub =
+        service.createStagingBlockingStub();
+    List<ArtifactApi.ArtifactMetadata> artifactMetadatas = new ArrayList<>();
+    for (Map.Entry<String, String> entry : artifacts.entrySet()) {
+      artifactMetadatas.add(
+          putArtifact(stagingStub, stagingToken, entry.getKey(), 
entry.getValue()));
+    }
+    return commitManifest(stagingBlockingStub, stagingToken, 
artifactMetadatas);
+  }
+
+  private void checkArtifacts(
+      ArtifactServicePair service, String retrievalToken, Map<String, String> 
artifacts)
+      throws Exception {
+    ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub =
+        service.createRetrievalStub();
+    ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub 
retrievalBlockingStub =
+        service.createRetrievalBlockingStub();
+    ArtifactApi.Manifest manifest =
+        retrievalBlockingStub
+            .getManifest(
+                ArtifactApi.GetManifestRequest.newBuilder()
+                    .setRetrievalToken(retrievalToken)
+                    .build())
+            .getManifest();
+    Assert.assertEquals(manifest.getArtifactCount(), artifacts.size());
+    for (ArtifactApi.ArtifactMetadata artifact : manifest.getArtifactList()) {
+      String contents = getArtifact(retrievalStub, retrievalToken, 
artifact.getName());
+      Assert.assertEquals(artifacts.get(artifact.getName()), contents);
+    }
+  }
+
+  private void runTest(ArtifactServicePair service, Map<String, String> 
artifacts)
+      throws Exception {
+    checkArtifacts(
+        service, stageArtifacts(service, service.getStagingToken("nonce"), 
artifacts), artifacts);
+  }
+
+  private Map<String, String> identiyMap(String... keys) {
 
 Review comment:
   Done.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 320909)
    Time Spent: 50m  (was: 40m)

> Flink portable pipeline jars do not need to stage artifacts remotely
> --------------------------------------------------------------------
>
>                 Key: BEAM-8312
>                 URL: https://issues.apache.org/jira/browse/BEAM-8312
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Kyle Weaver
>            Assignee: Kyle Weaver
>            Priority: Major
>              Labels: portability-flink
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently, Flink job jars re-stage all artifacts at runtime (on the 
> JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. 
> However, since the manifest and all the artifacts live on the classpath of 
> the jar, and everything from the classpath is copied to the Flink workers 
> anyway [2], it should not be necessary to do additional artifact staging. We 
> could replace BeamFileSystemArtifactRetrievalService in this case with a 
> simple ArtifactRetrievalService that just pulls the artifacts from the 
> classpath.
>  
>  [1] 
> [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java]
> [2] 
> [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to