This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 27f4836 [BEAM-10762] Fix artifact staging bug in Flink/Spark uber jar
runners.
new d52f545 Merge pull request #12803 from ibzib/BEAM-10762
27f4836 is described below
commit 27f4836be06f38daf5741847c77e557b8b26a201
Author: Kyle Weaver <[email protected]>
AuthorDate: Wed Sep 9 16:45:00 2020 -0700
[BEAM-10762] Fix artifact staging bug in Flink/Spark uber jar runners.
- Replace the obsolete artifact staging token with the NO ARTIFACTS
placeholder.
- Add a test that verifies pipelines pass when no artifacts are staged.
---
runners/flink/job-server/flink_job_server.gradle | 12 +++--
runners/flink/job-server/test_flink_uber_jar.sh | 60 +++++++++++++++-------
.../beam/runners/flink/FlinkPipelineRunner.java | 18 ++++---
.../jobsubmission/PortablePipelineJarUtils.java | 6 ---
.../beam/runners/spark/SparkPipelineRunner.java | 18 ++++---
.../beam/sdk/options/PortablePipelineOptions.java | 14 -----
.../sdk/options/PortablePipelineOptionsTest.java | 3 --
7 files changed, 71 insertions(+), 60 deletions(-)
diff --git a/runners/flink/job-server/flink_job_server.gradle
b/runners/flink/job-server/flink_job_server.gradle
index 5bebc7a..556857b 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -257,9 +257,9 @@ task miniCluster(type: Jar, dependsOn: shadowJar) {
zip64 true // jar needs to contain more than 65535 files
}
-def addTestFlinkUberJarPy(String pyVersion) {
+def addTestFlinkUberJarPy(String pyVersion, boolean saveMainSession) {
def pyBuildPath = pyVersion.startsWith("2") ? "2" :
pyVersion.replaceAll("\\.", "")
- project.tasks.create(name: "testFlinkUberJarPy${pyBuildPath}") {
+ project.tasks.create(name:
"testFlinkUberJarPy${pyBuildPath}${saveMainSession ? 'saveMainSession' : ''}") {
dependsOn miniCluster
dependsOn shadowJar
dependsOn ":sdks:python:container:py${pyBuildPath}:docker"
@@ -276,6 +276,9 @@ def addTestFlinkUberJarPy(String pyVersion) {
+ "${project.docker_image_default_repo_prefix}"
+ "python${pyVersion}_sdk:${project.sdk_version}",
]
+ if (saveMainSession) {
+ options.add('--save_main_session')
+ }
args "-c", "../../job-server/test_flink_uber_jar.sh ${options.join('
')}"
}
}
@@ -284,11 +287,14 @@ def addTestFlinkUberJarPy(String pyVersion) {
["2.7", "3.5", "3.6", "3.7"].each{ pyVersion ->
addTestJavaJarCreator(pyVersion)
- addTestFlinkUberJarPy(pyVersion)
+ addTestFlinkUberJarPy(pyVersion, false)
+ addTestFlinkUberJarPy(pyVersion, true)
}
task testPipelineJar() {
dependsOn testJavaJarCreatorPy37
dependsOn testFlinkUberJarPy36
+ dependsOn testFlinkUberJarPy36saveMainSession
dependsOn testFlinkUberJarPy37
+ dependsOn testFlinkUberJarPy37saveMainSession
}
diff --git a/runners/flink/job-server/test_flink_uber_jar.sh
b/runners/flink/job-server/test_flink_uber_jar.sh
index ace5da9..1af7da6 100755
--- a/runners/flink/job-server/test_flink_uber_jar.sh
+++ b/runners/flink/job-server/test_flink_uber_jar.sh
@@ -21,6 +21,8 @@
set -e
set -v
+SAVE_MAIN_SESSION=0
+
while [[ $# -gt 0 ]]
do
key="$1"
@@ -55,6 +57,10 @@ case $key in
shift # past argument
shift # past value
;;
+ --save_main_session)
+ SAVE_MAIN_SESSION=1
+ shift # past value
+ ;;
*) # unknown option
echo "Unknown option: $1"
exit 1
@@ -105,32 +111,50 @@ from apache_beam.transforms import Map
logging.basicConfig(level=logging.INFO)
-# To test that our main session is getting plumbed through artifact staging
-# correctly, create a global variable. If the main session is not plumbed
-# through properly, global_var will be undefined and the pipeline will fail.
-global_var = 1
-
pipeline_options = PipelineOptions()
-pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline = beam.Pipeline(options=pipeline_options)
-pcoll = (pipeline
- | Create([0, 1, 2])
- | Map(lambda x: x + global_var))
+
+if pipeline_options.view_as(SetupOptions).save_main_session:
+ # To test that our main session is getting plumbed through artifact staging
+ # correctly, create a global variable. If the main session is not plumbed
+ # through properly, global_var will be undefined and the pipeline will fail.
+ global_var = 1
+ pcoll = (pipeline
+ | Create([0, 1, 2])
+ | Map(lambda x: x + global_var))
+else:
+ pcoll = (pipeline
+ | Create([0, 1, 2])
+ | Map(lambda x: x + 1))
+
assert_that(pcoll, equal_to([1, 2, 3]))
result = pipeline.run()
result.wait_until_finish()
"
-(python -c "$PIPELINE_PY" \
- --runner FlinkRunner \
- --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
- --parallelism 1 \
- --environment_type DOCKER \
- --environment_config "$PYTHON_CONTAINER_IMAGE" \
- --flink_master "localhost:$FLINK_PORT" \
- --flink_submit_uber_jar \
-) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+if [[ "$SAVE_MAIN_SESSION" -eq 0 ]]; then
+ (python -c "$PIPELINE_PY" \
+ --runner FlinkRunner \
+ --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
+ --parallelism 1 \
+ --environment_type DOCKER \
+ --environment_config "$PYTHON_CONTAINER_IMAGE" \
+ --flink_master "localhost:$FLINK_PORT" \
+ --flink_submit_uber_jar \
+ ) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+else
+ (python -c "$PIPELINE_PY" \
+ --runner FlinkRunner \
+ --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
+ --parallelism 1 \
+ --environment_type DOCKER \
+ --environment_config "$PYTHON_CONTAINER_IMAGE" \
+ --flink_master "localhost:$FLINK_PORT" \
+ --flink_submit_uber_jar \
+ --save_main_session
+ ) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+fi
kill %1 || echo "Failed to shut down Flink mini cluster"
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 380874f..ebc3d04 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -23,6 +23,7 @@ import static
org.apache.beam.runners.fnexecution.translation.PipelineTranslator
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -42,8 +43,6 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
-import
org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
@@ -170,13 +169,16 @@ public class FlinkPipelineRunner implements
PortablePipelineRunner {
Pipeline pipeline =
PortablePipelineJarUtils.getPipelineFromClasspath(baseJobName);
Struct originalOptions =
PortablePipelineJarUtils.getPipelineOptionsFromClasspath(baseJobName);
- // Flink pipeline jars distribute and retrieve artifacts via the classpath.
- PortablePipelineOptions portablePipelineOptions =
-
PipelineOptionsTranslation.fromProto(originalOptions).as(PortablePipelineOptions.class);
-
portablePipelineOptions.setRetrievalServiceType(RetrievalServiceType.CLASSLOADER);
- String retrievalToken =
PortablePipelineJarUtils.getArtifactManifestUri(baseJobName);
+ // The retrieval token is only required by the legacy artifact service,
which the Flink runner
+ // no longer uses.
+ String retrievalToken =
+ ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN
+ .getValueDescriptor()
+ .getOptions()
+ .getExtension(RunnerApi.beamConstant);
- FlinkPipelineOptions flinkOptions =
portablePipelineOptions.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions flinkOptions =
+
PipelineOptionsTranslation.fromProto(originalOptions).as(FlinkPipelineOptions.class);
String invocationId =
String.format("%s_%s", flinkOptions.getJobName(),
UUID.randomUUID().toString());
diff --git
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
index fea06c9..838afe5 100644
---
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
+++
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
* <ul>
* <li>pipeline.json
* <li>pipeline-options.json
- * <li>artifact-manifest.json
* <li>artifacts/
* <ul>
* <li>...artifact files...
@@ -68,7 +67,6 @@ import org.slf4j.LoggerFactory;
public abstract class PortablePipelineJarUtils {
private static final String ARTIFACT_FOLDER = "artifacts";
private static final String PIPELINE_FOLDER = "BEAM-PIPELINE";
- private static final String ARTIFACT_MANIFEST = "artifact-manifest.json";
private static final String PIPELINE = "pipeline.json";
private static final String PIPELINE_OPTIONS = "pipeline-options.json";
private static final String PIPELINE_MANIFEST = PIPELINE_FOLDER +
"/pipeline-manifest.json";
@@ -111,10 +109,6 @@ public abstract class PortablePipelineJarUtils {
return builder.build();
}
- public static String getArtifactManifestUri(String jobName) {
- return PIPELINE_FOLDER + "/" + jobName + "/" + ARTIFACT_MANIFEST;
- }
-
static String getPipelineUri(String jobName) {
return PIPELINE_FOLDER + "/" + jobName + "/" + PIPELINE;
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index f33cb47..9ba700f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -52,8 +53,6 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
-import
org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
@@ -232,13 +231,16 @@ public class SparkPipelineRunner implements
PortablePipelineRunner {
Pipeline pipeline =
PortablePipelineJarUtils.getPipelineFromClasspath(baseJobName);
Struct originalOptions =
PortablePipelineJarUtils.getPipelineOptionsFromClasspath(baseJobName);
- // Spark pipeline jars distribute and retrieve artifacts via the classpath.
- PortablePipelineOptions portablePipelineOptions =
-
PipelineOptionsTranslation.fromProto(originalOptions).as(PortablePipelineOptions.class);
-
portablePipelineOptions.setRetrievalServiceType(RetrievalServiceType.CLASSLOADER);
- String retrievalToken =
PortablePipelineJarUtils.getArtifactManifestUri(baseJobName);
+ // The retrieval token is only required by the legacy artifact service,
which the Spark runner
+ // no longer uses.
+ String retrievalToken =
+ ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN
+ .getValueDescriptor()
+ .getOptions()
+ .getExtension(RunnerApi.beamConstant);
- SparkPipelineOptions sparkOptions =
portablePipelineOptions.as(SparkPipelineOptions.class);
+ SparkPipelineOptions sparkOptions =
+
PipelineOptionsTranslation.fromProto(originalOptions).as(SparkPipelineOptions.class);
String invocationId =
String.format("%s_%s", sparkOptions.getJobName(),
UUID.randomUUID().toString());
if (sparkOptions.getAppName() == null) {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index ad55a8f..226ee80 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -103,18 +103,4 @@ public interface PortablePipelineOptions extends
PipelineOptions {
String getOutputExecutablePath();
void setOutputExecutablePath(String outputExecutablePath);
-
- /** Enumeration of the different implementations of the artifact retrieval
service. */
- enum RetrievalServiceType {
- /** Artifacts are to be retrieved from a {@link
org.apache.beam.sdk.io.FileSystem}. */
- FILE_SYSTEM,
- /** Artifacts are to be retrieved from the runtime {@link ClassLoader}. */
- CLASSLOADER,
- }
-
- @Description("The artifact retrieval service to be used.")
- @Default.Enum("FILE_SYSTEM")
- RetrievalServiceType getRetrievalServiceType();
-
- void setRetrievalServiceType(RetrievalServiceType retrievalServiceType);
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
index ebbb138..675822c 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PortablePipelineOptionsTest.java
@@ -37,8 +37,5 @@ public class PortablePipelineOptionsTest {
assertThat(options.getEnvironmentCacheMillis(), is(0));
assertThat(options.getEnvironmentExpirationMillis(), is(0));
assertThat(options.getOutputExecutablePath(), is(nullValue()));
- assertThat(
- options.getRetrievalServiceType(),
- is(PortablePipelineOptions.RetrievalServiceType.FILE_SYSTEM));
}
}