This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81c44b4  [BEAM-7967] Execute portable Flink application jar
     new c4674b6  Merge pull request #9408 from ibzib/flink-execute-jar
81c44b4 is described below

commit 81c44b446d40eff6812f45ed7c4e78e845f2eee2
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Tue Aug 27 13:39:02 2019 -0700

    [BEAM-7967] Execute portable Flink application jar
---
 .../job_PostCommit_PortableJar_Flink.groovy        |  38 ++++++
 runners/flink/job-server/flink_job_server.gradle   |  17 +++
 runners/flink/job-server/test_pipeline_jar.sh      | 121 +++++++++++++++++++
 .../beam/runners/flink/FlinkPipelineRunner.java    |  74 ++++++++++++
 .../jobsubmission/PortablePipelineJarUtils.java    | 130 +++++++++++++++++++++
 5 files changed, 380 insertions(+)

diff --git a/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy 
b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy
new file mode 100644
index 0000000..a2bc53e
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+// Tests creation and execution of portable pipeline Jars on the Flink runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink',
+  'Run PortableJar_Flink PostCommit', 'Flink Portable Jar Tests', this) {
+  description('Tests creation and execution of portable pipeline Jars on the 
Flink runner.')
+
+  // Set common parameters.
+  commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+  // Gradle goals for this job.
+  steps {
+    gradle {
+      rootBuildScriptDir(commonJobProperties.checkoutDir)
+      tasks(':runners:flink:1.8:job-server:testPipelineJar')
+      commonJobProperties.setGradleSwitches(delegate)
+    }
+  }
+}
diff --git a/runners/flink/job-server/flink_job_server.gradle 
b/runners/flink/job-server/flink_job_server.gradle
index 0f555fb..3789007 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -177,3 +177,20 @@ project.ext.validatesCrossLanguageRunner = 
createCrossLanguageValidatesRunnerTas
     "--shutdownSourcesOnFinalWatermark",
   ]
 )
+
+task testPipelineJar() {
+  dependsOn shadowJar
+  dependsOn ":sdks:python:container:py35:docker"
+  doLast{
+    exec {
+      executable "sh"
+      def options = [
+        "--flink_job_server_jar ${shadowJar.archivePath}",
+        "--env_dir 
${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
+        "--python_root_dir ${project.rootDir}/sdks/python",
+        "--python_version 3.5"
+      ]
+      args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}"
+    }
+  }
+}
diff --git a/runners/flink/job-server/test_pipeline_jar.sh 
b/runners/flink/job-server/test_pipeline_jar.sh
new file mode 100755
index 0000000..c59facf
--- /dev/null
+++ b/runners/flink/job-server/test_pipeline_jar.sh
@@ -0,0 +1,121 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+set -e
+set -v
+
+while [[ $# -gt 0 ]]
+do
+key="$1"
+case $key in
+    --flink_job_server_jar)
+        FLINK_JOB_SERVER_JAR="$2"
+        shift # past argument
+        shift # past value
+        ;;
+    --env_dir)
+        ENV_DIR="$2"
+        shift # past argument
+        shift # past value
+        ;;
+    --python_root_dir)
+        PYTHON_ROOT_DIR="$2"
+        shift # past argument
+        shift # past value
+        ;;
+    --python_version)
+        PYTHON_VERSION="$2"
+        shift # past argument
+        shift # past value
+        ;;
+    *)    # unknown option
+        echo "Unknown option: $1"
+        exit 1
+        ;;
+esac
+done
+
+# Go to the root of the repository
+cd $(git rev-parse --show-toplevel)
+
+# Verify docker command exists
+command -v docker
+docker -v
+
+CONTAINER=$USER-docker-apache.bintray.io/beam/python$PYTHON_VERSION
+TAG=latest
+# Verify container has already been built
+docker images $CONTAINER:$TAG | grep $TAG
+
+# Set up Python environment
+virtualenv -p python$PYTHON_VERSION $ENV_DIR
+. $ENV_DIR/bin/activate
+pip install --retries 10 -e $PYTHON_ROOT_DIR
+
+PIPELINE_PY="
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import Create
+from apache_beam.transforms import Map
+
+# To test that our main session is getting plumbed through artifact staging
+# correctly, create a global variable. If the main session is not plumbed
+# through properly, global_var will be undefined and the pipeline will fail.
+global_var = 1
+
+pipeline_options = PipelineOptions()
+pipeline_options.view_as(SetupOptions).save_main_session = True
+pipeline = beam.Pipeline(options=pipeline_options)
+pcoll = (pipeline
+         | Create([0, 1, 2])
+         | Map(lambda x: x + global_var))
+assert_that(pcoll, equal_to([1, 2, 3]))
+
+result = pipeline.run()
+result.wait_until_finish()
+"
+
+# Create the jar
+OUTPUT_JAR=flink-test-$(date +%Y%m%d-%H%M%S).jar
+(python -c "$PIPELINE_PY" \
+  --runner FlinkRunner \
+  --flink_job_server_jar $FLINK_JOB_SERVER_JAR \
+  --output_executable_path $OUTPUT_JAR \
+  --parallelism 1 \
+  --sdk_worker_parallelism 1 \
+  --environment_type DOCKER \
+  --environment_config=$CONTAINER:$TAG \
+) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+
+if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
+  # Execute the jar
+  java -jar $OUTPUT_JAR || TEST_EXIT_CODE=$?
+fi
+
+rm -rf $ENV_DIR
+rm -f $OUTPUT_JAR
+
+if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
+  echo ">>> SUCCESS"
+else
+  echo ">>> FAILURE"
+fi
+exit $TEST_EXIT_CODE
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index e61cb12..f33af5c0 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,25 +17,37 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 import static 
org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
 
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
+import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.DetachedEnvironment;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,4 +132,66 @@ public class FlinkPipelineRunner implements 
PortablePipelineRunner {
       return flinkRunnerResult;
     }
   }
+
+  /**
+   * Main method to be called only as the entry point to an executable jar 
with structure as defined
+   * in {@link PortablePipelineJarUtils}.
+   */
+  public static void main(String[] args) throws Exception {
+    // Register standard file systems.
+    FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
+
+    FlinkPipelineRunnerConfiguration configuration = parseArgs(args);
+    Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath();
+    Struct options = 
PortablePipelineJarUtils.getPipelineOptionsFromClasspath();
+    FlinkPipelineOptions flinkOptions =
+        
PipelineOptionsTranslation.fromProto(options).as(FlinkPipelineOptions.class);
+    String invocationId =
+        String.format("%s_%s", flinkOptions.getJobName(), 
UUID.randomUUID().toString());
+    ProxyManifest proxyManifest = 
PortablePipelineJarUtils.getArtifactManifestFromClassPath();
+    String retrievalToken =
+        PortablePipelineJarUtils.stageArtifacts(
+            proxyManifest, flinkOptions, invocationId, 
configuration.artifactStagingPath);
+
+    FlinkPipelineRunner runner =
+        new FlinkPipelineRunner(
+            flinkOptions,
+            configuration.flinkConfDir,
+            
detectClassPathResourcesToStage(FlinkPipelineRunner.class.getClassLoader()));
+    JobInfo jobInfo =
+        JobInfo.create(invocationId, flinkOptions.getJobName(), 
retrievalToken, options);
+    try {
+      runner.run(pipeline, jobInfo);
+    } catch (Exception e) {
+      throw new RuntimeException(String.format("Job %s failed.", 
invocationId), e);
+    }
+    LOG.info("Job {} finished successfully.", invocationId);
+  }
+
+  private static class FlinkPipelineRunnerConfiguration {
+    @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")
+    private String artifactStagingPath =
+        Paths.get(System.getProperty("java.io.tmpdir"), 
"beam-artifact-staging").toString();
+
+    @Option(
+        name = "--flink-conf-dir",
+        usage =
+            "Directory containing Flink YAML configuration files. "
+                + "These properties will be set to all jobs submitted to Flink 
and take precedence "
+                + "over configurations in FLINK_CONF_DIR.")
+    private String flinkConfDir = null;
+  }
+
+  private static FlinkPipelineRunnerConfiguration parseArgs(String[] args) {
+    FlinkPipelineRunnerConfiguration configuration = new 
FlinkPipelineRunnerConfiguration();
+    CmdLineParser parser = new CmdLineParser(configuration);
+    try {
+      parser.parseArgument(args);
+    } catch (CmdLineException e) {
+      LOG.error("Unable to parse command line arguments.", e);
+      parser.printUsage(System.err);
+      throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+    }
+    return configuration;
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
index af712ad..5045f3b 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
@@ -17,6 +17,38 @@
  */
 package org.apache.beam.runners.fnexecution.jobsubmission;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Message.Builder;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.commons.compress.utils.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Contains common code for writing and reading portable pipeline jars.
  *
@@ -51,4 +83,102 @@ public abstract class PortablePipelineJarUtils {
       ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json";
   static final String PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json";
   static final String PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + 
"/pipeline-options.json";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortablePipelineJarCreator.class);
+
+  private static InputStream getResourceFromClassPath(String resourcePath) 
throws IOException {
+    InputStream inputStream = 
PortablePipelineJarUtils.class.getResourceAsStream(resourcePath);
+    if (inputStream == null) {
+      throw new FileNotFoundException(
+          String.format("Resource %s not found on classpath.", resourcePath));
+    }
+    return inputStream;
+  }
+
+  /** Populates {@code builder} using the JSON resource specified by {@code 
resourcePath}. */
+  private static void parseJsonResource(String resourcePath, Builder builder) 
throws IOException {
+    try (InputStream inputStream = getResourceFromClassPath(resourcePath)) {
+      String contents = new String(ByteStreams.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+      JsonFormat.parser().merge(contents, builder);
+    }
+  }
+
+  public static Pipeline getPipelineFromClasspath() throws IOException {
+    Pipeline.Builder builder = Pipeline.newBuilder();
+    parseJsonResource("/" + PIPELINE_PATH, builder);
+    return builder.build();
+  }
+
+  public static Struct getPipelineOptionsFromClasspath() throws IOException {
+    Struct.Builder builder = Struct.newBuilder();
+    parseJsonResource("/" + PIPELINE_OPTIONS_PATH, builder);
+    return builder.build();
+  }
+
+  public static ProxyManifest getArtifactManifestFromClassPath() throws 
IOException {
+    ProxyManifest.Builder builder = ProxyManifest.newBuilder();
+    parseJsonResource("/" + ARTIFACT_MANIFEST_PATH, builder);
+    return builder.build();
+  }
+
+  /** Writes artifacts listed in {@code proxyManifest}. */
+  public static String stageArtifacts(
+      ProxyManifest proxyManifest,
+      PipelineOptions options,
+      String invocationId,
+      String artifactStagingPath)
+      throws Exception {
+    Collection<StagedFile> filesToStage =
+        prepareArtifactsForStaging(proxyManifest, options, invocationId);
+    try (GrpcFnServer artifactServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            new BeamFileSystemArtifactStagingService(), 
InProcessServerFactory.create())) {
+      ManagedChannel grpcChannel =
+          InProcessManagedChannelFactory.create()
+              .forDescriptor(artifactServer.getApiServiceDescriptor());
+      ArtifactServiceStager stager = 
ArtifactServiceStager.overChannel(grpcChannel);
+      String stagingSessionToken =
+          BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+              invocationId, artifactStagingPath);
+      String retrievalToken = stager.stage(stagingSessionToken, filesToStage);
+      // Clean up.
+      for (StagedFile file : filesToStage) {
+        if (!file.getFile().delete()) {
+          LOG.warn("Failed to delete file {}", file.getFile());
+        }
+      }
+      grpcChannel.shutdown();
+      return retrievalToken;
+    }
+  }
+
+  /**
+   * Artifacts are expected to exist as resources on the classpath, located 
using {@code
+   * proxyManifest}. Write them to tmp files so they can be staged.
+   */
+  private static Collection<StagedFile> prepareArtifactsForStaging(
+      ProxyManifest proxyManifest, PipelineOptions options, String 
invocationId)
+      throws IOException {
+    List<StagedFile> filesToStage = new ArrayList<>();
+    Path outputFolderPath =
+        Paths.get(
+            MoreObjects.firstNonNull(
+                options.getTempLocation(), 
System.getProperty("java.io.tmpdir")),
+            invocationId);
+    if (!outputFolderPath.toFile().mkdir()) {
+      throw new IOException("Failed to create folder " + outputFolderPath);
+    }
+    for (Location location : proxyManifest.getLocationList()) {
+      try (InputStream inputStream = 
getResourceFromClassPath(location.getUri())) {
+        Path outputPath = 
outputFolderPath.resolve(UUID.randomUUID().toString());
+        LOG.trace("Writing artifact {} to file {}", location.getName(), 
outputPath);
+        File file = outputPath.toFile();
+        try (FileOutputStream outputStream = new FileOutputStream(file)) {
+          IOUtils.copy(inputStream, outputStream);
+          filesToStage.add(StagedFile.of(file, location.getName()));
+        }
+      }
+    }
+    return filesToStage;
+  }
 }

Reply via email to