This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/upload_graph_v2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6c35401299b3c1b0b50c78b95a3917d546cc33b0
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Aug 13 15:13:31 2024 +0200

    Fix upload_graph on v2
---
 runners/google-cloud-dataflow-java/build.gradle         |  2 +-
 .../apache/beam/runners/dataflow/DataflowRunner.java    | 14 +++++++++++++-
 .../beam/runners/dataflow/DataflowRunnerTest.java       | 17 +++++++++++++++++
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 55f0074b9f3..6728ca64569 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -506,7 +506,7 @@ task validatesRunnerV2Streaming {
   description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing 
streaming mode"
   dependsOn(createRunnerV2ValidatesRunnerTest(
     name: 'validatesRunnerV2TestStreaming',
-    pipelineOptions: runnerV2PipelineOptions + ['--streaming', 
'--experiments=enable_streaming_engine'],
+    pipelineOptions: runnerV2PipelineOptions + ['--streaming', 
'--experiments=enable_streaming_engine', '--experiments=upload_graph'],
     excludedCategories: [
       'org.apache.beam.sdk.testing.LargeKeys$Above10KB',
       'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo',
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 708c6341326..6d1317f08c3 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1385,7 +1385,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     byte[] jobGraphBytes = 
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
     int jobGraphByteSize = jobGraphBytes.length;
     if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
-        && !hasExperiment(options, "upload_graph")) {
+        && !hasExperiment(options, "upload_graph")
+        && !useUnifiedWorker(options)) {
       List<String> experiments = firstNonNull(options.getExperiments(), 
Collections.emptyList());
       options.setExperiments(
           
ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build());
@@ -1396,6 +1397,17 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
           CREATE_JOB_REQUEST_LIMIT_BYTES);
     }
 
+    if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) {
+      List<String> experiments = options.getExperiements();
+      while (list.contains("upload_graph")) {
+        list.remove("upload_graph");
+      }
+      options.setExperiments(experiments)
+      LOG.warning(
+          "The upload_graph experiment was specified, but it does not apply "
+             + "to runner v2 jobs. Option has been automatically removed.")
+    }
+
     // Upload the job to GCS and remove the graph object from the API call.  
The graph
     // will be downloaded from GCS by the service.
     if (hasExperiment(options, "upload_graph")) {
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index cf1066e41d2..bbfaa84aa9a 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -838,6 +838,23 @@ public class DataflowRunnerTest implements Serializable {
             .startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
   }
 
+  @Test
+  public void testUploadGraphV2IsNoOp() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(Arrays.asList("upload_graph", "use_runner_v2"));
+    Pipeline p = buildDataflowPipeline(options);
+    p.run();
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), 
jobCaptor.capture());
+    assertValidJob(jobCaptor.getValue());
+    assertTrue(
+        jobCaptor
+            .getValue()
+            .getStepsLocation()
+            .startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
+  }
+
   /** Test for automatically using upload_graph when the job graph is too 
large (>10MB). */
   @Test
   public void testUploadGraphWithAutoUpload() throws IOException {

Reply via email to