This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f4836  [BEAM-10762] Fix artifact staging bug in Flink/Spark uber jar 
runners.
     new d52f545  Merge pull request #12803 from ibzib/BEAM-10762
27f4836 is described below

commit 27f4836be06f38daf5741847c77e557b8b26a201
Author: Kyle Weaver <[email protected]>
AuthorDate: Wed Sep 9 16:45:00 2020 -0700

    [BEAM-10762] Fix artifact staging bug in Flink/Spark uber jar runners.
    
    - Replace the obsolete artifact staging token with the NO ARTIFACTS 
placeholder.
    
    - Add a test that verifies pipelines pass when no artifacts are staged.
---
 runners/flink/job-server/flink_job_server.gradle   | 12 +++--
 runners/flink/job-server/test_flink_uber_jar.sh    | 60 +++++++++++++++-------
 .../beam/runners/flink/FlinkPipelineRunner.java    | 18 ++++---
 .../jobsubmission/PortablePipelineJarUtils.java    |  6 ---
 .../beam/runners/spark/SparkPipelineRunner.java    | 18 ++++---
 .../beam/sdk/options/PortablePipelineOptions.java  | 14 -----
 .../sdk/options/PortablePipelineOptionsTest.java   |  3 --
 7 files changed, 71 insertions(+), 60 deletions(-)

diff --git a/runners/flink/job-server/flink_job_server.gradle 
b/runners/flink/job-server/flink_job_server.gradle
index 5bebc7a..556857b 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -257,9 +257,9 @@ task miniCluster(type: Jar, dependsOn: shadowJar) {
   zip64 true // jar needs to contain more than 65535 files
 }
 
-def addTestFlinkUberJarPy(String pyVersion) {
+def addTestFlinkUberJarPy(String pyVersion, boolean saveMainSession) {
   def pyBuildPath = pyVersion.startsWith("2") ? "2" : 
pyVersion.replaceAll("\\.", "")
-  project.tasks.create(name: "testFlinkUberJarPy${pyBuildPath}") {
+  project.tasks.create(name: 
"testFlinkUberJarPy${pyBuildPath}${saveMainSession ? 'saveMainSession' : ''}") {
     dependsOn miniCluster
     dependsOn shadowJar
     dependsOn ":sdks:python:container:py${pyBuildPath}:docker"
@@ -276,6 +276,9 @@ def addTestFlinkUberJarPy(String pyVersion) {
                         + "${project.docker_image_default_repo_prefix}"
                         + "python${pyVersion}_sdk:${project.sdk_version}",
         ]
+        if (saveMainSession) {
+          options.add('--save_main_session')
+        }
         args "-c", "../../job-server/test_flink_uber_jar.sh ${options.join(' 
')}"
       }
     }
@@ -284,11 +287,14 @@ def addTestFlinkUberJarPy(String pyVersion) {
 
 ["2.7", "3.5", "3.6", "3.7"].each{ pyVersion ->
   addTestJavaJarCreator(pyVersion)
-  addTestFlinkUberJarPy(pyVersion)
+  addTestFlinkUberJarPy(pyVersion, false)
+  addTestFlinkUberJarPy(pyVersion, true)
 }
 
 task testPipelineJar() {
   dependsOn testJavaJarCreatorPy37
   dependsOn testFlinkUberJarPy36
+  dependsOn testFlinkUberJarPy36saveMainSession
   dependsOn testFlinkUberJarPy37
+  dependsOn testFlinkUberJarPy37saveMainSession
 }
diff --git a/runners/flink/job-server/test_flink_uber_jar.sh 
b/runners/flink/job-server/test_flink_uber_jar.sh
index ace5da9..1af7da6 100755
--- a/runners/flink/job-server/test_flink_uber_jar.sh
+++ b/runners/flink/job-server/test_flink_uber_jar.sh
@@ -21,6 +21,8 @@
 set -e
 set -v
 
+SAVE_MAIN_SESSION=0
+
 while [[ $# -gt 0 ]]
 do
 key="$1"
@@ -55,6 +57,10 @@ case $key in
         shift # past argument
         shift # past value
         ;;
+    --save_main_session)
+        SAVE_MAIN_SESSION=1
+        shift # past value
+        ;;
     *)    # unknown option
         echo "Unknown option: $1"
         exit 1
@@ -105,32 +111,50 @@ from apache_beam.transforms import Map
 
 logging.basicConfig(level=logging.INFO)
 
-# To test that our main session is getting plumbed through artifact staging
-# correctly, create a global variable. If the main session is not plumbed
-# through properly, global_var will be undefined and the pipeline will fail.
-global_var = 1
-
 pipeline_options = PipelineOptions()
-pipeline_options.view_as(SetupOptions).save_main_session = True
 pipeline = beam.Pipeline(options=pipeline_options)
-pcoll = (pipeline
-         | Create([0, 1, 2])
-         | Map(lambda x: x + global_var))
+
+if pipeline_options.view_as(SetupOptions).save_main_session:
+  # To test that our main session is getting plumbed through artifact staging
+  # correctly, create a global variable. If the main session is not plumbed
+  # through properly, global_var will be undefined and the pipeline will fail.
+  global_var = 1
+  pcoll = (pipeline
+           | Create([0, 1, 2])
+           | Map(lambda x: x + global_var))
+else:
+  pcoll = (pipeline
+           | Create([0, 1, 2])
+           | Map(lambda x: x + 1))
+
 assert_that(pcoll, equal_to([1, 2, 3]))
 
 result = pipeline.run()
 result.wait_until_finish()
 "
 
-(python -c "$PIPELINE_PY" \
-  --runner FlinkRunner \
-  --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
-  --parallelism 1 \
-  --environment_type DOCKER \
-  --environment_config "$PYTHON_CONTAINER_IMAGE" \
-  --flink_master "localhost:$FLINK_PORT" \
-  --flink_submit_uber_jar \
-) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+if [[ "$SAVE_MAIN_SESSION" -eq 0 ]]; then
+  (python -c "$PIPELINE_PY" \
+    --runner FlinkRunner \
+    --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
+    --parallelism 1 \
+    --environment_type DOCKER \
+    --environment_config "$PYTHON_CONTAINER_IMAGE" \
+    --flink_master "localhost:$FLINK_PORT" \
+    --flink_submit_uber_jar \
+  ) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+else
+  (python -c "$PIPELINE_PY" \
+    --runner FlinkRunner \
+    --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
+    --parallelism 1 \
+    --environment_type DOCKER \
+    --environment_config "$PYTHON_CONTAINER_IMAGE" \
+    --flink_master "localhost:$FLINK_PORT" \
+    --flink_submit_uber_jar \
+    --save_main_session
+  ) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+fi
 
 kill %1 || echo "Failed to shut down Flink mini cluster"
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 380874f..ebc3d04 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -23,6 +23,7 @@ import static 
org.apache.beam.runners.fnexecution.translation.PipelineTranslator
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -42,8 +43,6 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
-import 
org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -170,13 +169,16 @@ public class FlinkPipelineRunner implements 
PortablePipelineRunner {
     Pipeline pipeline = 
PortablePipelineJarUtils.getPipelineFromClasspath(baseJobName);
     Struct originalOptions = 
PortablePipelineJarUtils.getPipelineOptionsFromClasspath(baseJobName);
 
-    // Flink pipeline jars distribute and retrieve artifacts via the classpath.
-    PortablePipelineOptions portablePipelineOptions =
-        
PipelineOptionsTranslation.fromProto(originalOptions).as(PortablePipelineOptions.class);
-    
portablePipelineOptions.setRetrievalServiceType(RetrievalServiceType.CLASSLOADER);
-    String retrievalToken = 
PortablePipelineJarUtils.getArtifactManifestUri(baseJobName);
+    // The retrieval token is only required by the legacy artifact service, 
which the Flink runner
+    // no longer uses.
+    String retrievalToken =
+        ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN
+            .getValueDescriptor()
+            .getOptions()
+            .getExtension(RunnerApi.beamConstant);
 
-    FlinkPipelineOptions flinkOptions = 
portablePipelineOptions.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions flinkOptions =
+        
PipelineOptionsTranslation.fromProto(originalOptions).as(FlinkPipelineOptions.class);
     String invocationId =
         String.format("%s_%s", flinkOptions.getJobName(), 
UUID.randomUUID().toString());
 
diff --git 
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
 
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
index fea06c9..838afe5 100644
--- 
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
+++ 
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
  *             <ul>
  *               <li>pipeline.json
  *               <li>pipeline-options.json
- *               <li>artifact-manifest.json
  *               <li>artifacts/
  *                   <ul>
  *                     <li>...artifact files...
@@ -68,7 +67,6 @@ import org.slf4j.LoggerFactory;
 public abstract class PortablePipelineJarUtils {
   private static final String ARTIFACT_FOLDER = "artifacts";
   private static final String PIPELINE_FOLDER = "BEAM-PIPELINE";
-  private static final String ARTIFACT_MANIFEST = "artifact-manifest.json";
   private static final String PIPELINE = "pipeline.json";
   private static final String PIPELINE_OPTIONS = "pipeline-options.json";
   private static final String PIPELINE_MANIFEST = PIPELINE_FOLDER + 
"/pipeline-manifest.json";
@@ -111,10 +109,6 @@ public abstract class PortablePipelineJarUtils {
     return builder.build();
   }
 
-  public static String getArtifactManifestUri(String jobName) {
-    return PIPELINE_FOLDER + "/" + jobName + "/" + ARTIFACT_MANIFEST;
-  }
-
   static String getPipelineUri(String jobName) {
     return PIPELINE_FOLDER + "/" + jobName + "/" + PIPELINE;
   }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index f33cb47..9ba700f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -52,8 +53,6 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
-import 
org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -232,13 +231,16 @@ public class SparkPipelineRunner implements 
PortablePipelineRunner {
     Pipeline pipeline = 
PortablePipelineJarUtils.getPipelineFromClasspath(baseJobName);
     Struct originalOptions = 
PortablePipelineJarUtils.getPipelineOptionsFromClasspath(baseJobName);
 
-    // Spark pipeline jars distribute and retrieve artifacts via the classpath.
-    PortablePipelineOptions portablePipelineOptions =
-        
PipelineOptionsTranslation.fromProto(originalOptions).as(PortablePipelineOptions.class);
-    
portablePipelineOptions.setRetrievalServiceType(RetrievalServiceType.CLASSLOADER);
-    String retrievalToken = 
PortablePipelineJarUtils.getArtifactManifestUri(baseJobName);
+    // The retrieval token is only required by the legacy artifact service, 
which the Spark runner
+    // no longer uses.
+    String retrievalToken =
+        ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN
+            .getValueDescriptor()
+            .getOptions()
+            .getExtension(RunnerApi.beamConstant);
 
-    SparkPipelineOptions sparkOptions = 
portablePipelineOptions.as(SparkPipelineOptions.class);
+    SparkPipelineOptions sparkOptions =
+        
PipelineOptionsTranslation.fromProto(originalOptions).as(SparkPipelineOptions.class);
     String invocationId =
         String.format("%s_%s", sparkOptions.getJobName(), 
UUID.randomUUID().toString());
     if (sparkOptions.getAppName() == null) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index ad55a8f..226ee80 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -103,18 +103,4 @@ public interface PortablePipelineOptions extends 
PipelineOptions {
   String getOutputExecutablePath();
 
   void setOutputExecutablePath(String outputExecutablePath);
-
-  /** Enumeration of the different implementations of the artifact retrieval 
service. */
-  enum RetrievalServiceType {
-    /** Artifacts are to be retrieved from a {@link 
org.apache.beam.sdk.io.FileSystem}. */
-    FILE_SYSTEM,
-    /** Artifacts are to be retrieved from the runtime {@link ClassLoader}. */
-    CLASSLOADER,
-  }
-
-  @Description("The artifact retrieval service to be used.")
-  @Default.Enum("FILE_SYSTEM")
-  RetrievalServiceType getRetrievalServiceType();
-
-  void setRetrievalServiceType(RetrievalServiceType retrievalServiceType);
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
index ebbb138..675822c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
@@ -37,8 +37,5 @@ public class PortablePipelineOptionsTest {
     assertThat(options.getEnvironmentCacheMillis(), is(0));
     assertThat(options.getEnvironmentExpirationMillis(), is(0));
     assertThat(options.getOutputExecutablePath(), is(nullValue()));
-    assertThat(
-        options.getRetrievalServiceType(),
-        is(PortablePipelineOptions.RetrievalServiceType.FILE_SYSTEM));
   }
 }

Reply via email to