[ 
https://issues.apache.org/jira/browse/BEAM-7967?focusedWorklogId=307474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307474
 ]

ASF GitHub Bot logged work on BEAM-7967:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Sep/19 22:08
            Start Date: 05/Sep/19 22:08
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on pull request #9408:  [BEAM-7967] 
Execute portable Flink application jar
URL: https://github.com/apache/beam/pull/9408#discussion_r321504185
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
 ##########
 @@ -51,4 +83,102 @@
       ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json";
   static final String PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json";
   static final String PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + 
"/pipeline-options.json";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortablePipelineJarCreator.class);
+
+  private static InputStream getResourceFromClassPath(String resourcePath) 
throws IOException {
+    InputStream inputStream = 
PortablePipelineJarUtils.class.getResourceAsStream(resourcePath);
+    if (inputStream == null) {
+      throw new FileNotFoundException(
+          String.format("Resource %s not found on classpath.", resourcePath));
+    }
+    return inputStream;
+  }
+
+  /** Populates {@code builder} using the JSON resource specified by {@code 
resourcePath}. */
+  private static void parseJsonResource(String resourcePath, Builder builder) 
throws IOException {
+    try (InputStream inputStream = getResourceFromClassPath(resourcePath)) {
+      String contents = new String(ByteStreams.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+      JsonFormat.parser().merge(contents, builder);
+    }
+  }
+
+  public static Pipeline getPipelineFromClasspath() throws IOException {
+    Pipeline.Builder builder = Pipeline.newBuilder();
+    parseJsonResource("/" + PIPELINE_PATH, builder);
+    return builder.build();
+  }
+
+  public static Struct getPipelineOptionsFromClasspath() throws IOException {
+    Struct.Builder builder = Struct.newBuilder();
+    parseJsonResource("/" + PIPELINE_OPTIONS_PATH, builder);
+    return builder.build();
+  }
+
+  public static ProxyManifest getArtifactManifestFromClassPath() throws 
IOException {
+    ProxyManifest.Builder builder = ProxyManifest.newBuilder();
+    parseJsonResource("/" + ARTIFACT_MANIFEST_PATH, builder);
+    return builder.build();
+  }
+
+  /** Writes artifacts listed in {@code proxyManifest}. */
+  public static String stageArtifacts(
+      ProxyManifest proxyManifest,
+      PipelineOptions options,
+      String invocationId,
+      String artifactStagingPath)
+      throws Exception {
+    Collection<StagedFile> filesToStage =
+        prepareArtifactsForStaging(proxyManifest, options, invocationId);
+    try (GrpcFnServer artifactServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            new BeamFileSystemArtifactStagingService(), 
InProcessServerFactory.create())) {
+      ManagedChannel grpcChannel =
+          InProcessManagedChannelFactory.create()
+              .forDescriptor(artifactServer.getApiServiceDescriptor());
+      ArtifactServiceStager stager = 
ArtifactServiceStager.overChannel(grpcChannel);
+      String stagingSessionToken =
+          BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+              invocationId, artifactStagingPath);
+      String retrievalToken = stager.stage(stagingSessionToken, filesToStage);
+      // Clean up.
+      for (StagedFile file : filesToStage) {
+        if (!file.getFile().delete()) {
+          LOG.warn("Failed to delete file {}", file.getFile());
+        }
+      }
+      grpcChannel.shutdown();
+      return retrievalToken;
+    }
+  }
+
+  /**
+   * Artifacts are expected to exist as resources on the classpath, located 
using {@code
+   * proxyManifest}. Write them to tmp files so they can be staged.
+   */
+  private static Collection<StagedFile> prepareArtifactsForStaging(
+      ProxyManifest proxyManifest, PipelineOptions options, String 
invocationId)
+      throws IOException {
+    List<StagedFile> filesToStage = new ArrayList<>();
+    Path outputFolderPath =
+        Paths.get(
+            MoreObjects.firstNonNull(
+                options.getTempLocation(), 
System.getProperty("java.io.tmpdir")),
+            invocationId);
+    if (!outputFolderPath.toFile().mkdir()) {
+      throw new IOException("Failed to create folder " + outputFolderPath);
+    }
+    for (Location location : proxyManifest.getLocationList()) {
+      try (InputStream inputStream = 
getResourceFromClassPath(location.getUri())) {
 
 Review comment:
   Can we use the jar directly instead of copying the artifacts to local file 
system?
   Copying files to local file system will also call for cleanup.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 307474)
    Time Spent: 2h 20m  (was: 2h 10m)

> Execute portable Flink application jar
> --------------------------------------
>
>                 Key: BEAM-7967
>                 URL: https://issues.apache.org/jira/browse/BEAM-7967
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Kyle Weaver
>            Assignee: Kyle Weaver
>            Priority: Major
>              Labels: portability-flink
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> [https://docs.google.com/document/d/1kj_9JWxGWOmSGeZ5hbLVDXSTv-zBrx4kQRqOq85RYD4/edit#heading=h.oes73844vmhl]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to