This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/upload_graph_v2 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6c35401299b3c1b0b50c78b95a3917d546cc33b0 Author: Danny McCormick <[email protected]> AuthorDate: Tue Aug 13 15:13:31 2024 +0200 Fix upload_graph on v2 --- runners/google-cloud-dataflow-java/build.gradle | 2 +- .../apache/beam/runners/dataflow/DataflowRunner.java | 14 +++++++++++++- .../beam/runners/dataflow/DataflowRunnerTest.java | 17 +++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 55f0074b9f3..6728ca64569 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -506,7 +506,7 @@ task validatesRunnerV2Streaming { description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing streaming mode" dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2TestStreaming', - pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], + pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine', '--experiments=upload_graph'], excludedCategories: [ 'org.apache.beam.sdk.testing.LargeKeys$Above10KB', 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo', diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 708c6341326..6d1317f08c3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1385,7 +1385,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); int jobGraphByteSize = jobGraphBytes.length; if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES - && !hasExperiment(options, "upload_graph")) { + && !hasExperiment(options, "upload_graph") + && !useUnifiedWorker(options)) { List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); options.setExperiments( ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build()); @@ -1396,6 +1397,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { CREATE_JOB_REQUEST_LIMIT_BYTES); } + if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) { + List<String> experiments = options.getExperiements(); + while (list.contains("upload_graph")) { + list.remove("upload_graph"); + } + options.setExperiments(experiments) + LOG.warning( + "The upload_graph experiment was specified, but it does not apply " + + "to runner v2 jobs. Option has been automatically removed.") + } + // Upload the job to GCS and remove the graph object from the API call. The graph // will be downloaded from GCS by the service. if (hasExperiment(options, "upload_graph")) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index cf1066e41d2..bbfaa84aa9a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -838,6 +838,23 @@ public class DataflowRunnerTest implements Serializable { .startsWith("gs://valid-bucket/temp/staging/dataflow_graph")); } + @Test + public void testUploadGraphV2IsNoOp() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("upload_graph", "use_runner_v2")); + Pipeline p = buildDataflowPipeline(options); + p.run(); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); + assertValidJob(jobCaptor.getValue()); + assertTrue( + jobCaptor + .getValue() + .getStepsLocation() + .startsWith("gs://valid-bucket/temp/staging/dataflow_graph")); + } + /** Test for automatically using upload_graph when the job graph is too large (>10MB). */ @Test public void testUploadGraphWithAutoUpload() throws IOException {
