This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b890399e11 Automatically switch to upload_graph when the graph is 
large (#28621)
3b890399e11 is described below

commit 3b890399e11d65695d04ae679e2fdf56555630a8
Author: liferoad <huxiangq...@gmail.com>
AuthorDate: Thu Oct 26 22:14:58 2023 -0400

    Automatically switch to upload_graph when the graph is large (#28621)
    
    * Automatically Switch to upload_graph when the graph is large
    
    * fix the formats
    
    * updated the CHANGES.md
    
    * added one test
    
    * fixed the styles
    
    * fixed the styles
    
    * fixed the styles
    
    * addressed the comments
---
 CHANGES.md                                         |  1 +
 .../beam/runners/dataflow/DataflowRunner.java      | 24 ++++++++++-----
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 36 +++++++++++++++++++++-
 3 files changed, 53 insertions(+), 8 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 49b4fdfe89b..a97035fdc29 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -71,6 +71,7 @@ should handle this. 
([#25252](https://github.com/apache/beam/issues/25252)).
 ## New Features / Improvements
 
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* `upload_graph` as one of the Experiments options for DataflowRunner is no 
longer required when the graph is larger than 10MB for Java SDK 
([PR#28621](https://github.com/apache/beam/pull/28621).
 
 ## Breaking Changes
 
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 891b4c0454c..80b4e4cfd8b 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -168,7 +168,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -1330,15 +1329,26 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
     }
 
+    // enable upload_graph when the graph is too large
+    byte[] jobGraphBytes = 
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
+    int jobGraphByteSize = jobGraphBytes.length;
+    if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
+        && !hasExperiment(options, "upload_graph")) {
+      List<String> experiments = firstNonNull(options.getExperiments(), new 
ArrayList<>());
+      experiments.add("upload_graph");
+      options.setExperiments(ImmutableList.copyOf(experiments));
+      LOG.info(
+          "The job graph size ({} in bytes) is larger than {}. Automatically 
add "
+              + "the upload_graph option to experiments.",
+          jobGraphByteSize,
+          CREATE_JOB_REQUEST_LIMIT_BYTES);
+    }
+
     // Upload the job to GCS and remove the graph object from the API call.  
The graph
     // will be downloaded from GCS by the service.
     if (hasExperiment(options, "upload_graph")) {
       DataflowPackage stagedGraph =
-          options
-              .getStager()
-              .stageToFile(
-                  
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
-                  DATAFLOW_GRAPH_FILE_NAME);
+          options.getStager().stageToFile(jobGraphBytes, 
DATAFLOW_GRAPH_FILE_NAME);
       newJob.getSteps().clear();
       newJob.setStepsLocation(stagedGraph.getLocation());
     }
@@ -1398,7 +1408,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     } catch (GoogleJsonResponseException e) {
       String errorMessages = "Unexpected errors";
       if (e.getDetails() != null) {
-        if (Utf8.encodedLength(newJob.toString()) >= 
CREATE_JOB_REQUEST_LIMIT_BYTES) {
+        if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
           errorMessages =
               "The size of the serialized JSON representation of the pipeline "
                   + "exceeds the allowable limit. "
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 078f25e0e38..bcdea03dba2 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -242,7 +242,7 @@ public class DataflowRunnerTest implements Serializable {
     mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
   }
 
-  private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
+  private static Pipeline buildDataflowPipeline(DataflowPipelineOptions 
options) {
     options.setStableUniqueNames(CheckEnabled.ERROR);
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -256,6 +256,22 @@ public class DataflowRunnerTest implements Serializable {
     return p;
   }
 
+  private static Pipeline 
buildDataflowPipelineWithLargeGraph(DataflowPipelineOptions options) {
+    options.setStableUniqueNames(CheckEnabled.ERROR);
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    for (int i = 0; i < 100; i++) {
+      p.apply("ReadMyFile_" + i, TextIO.read().from("gs://bucket/object"))
+          .apply("WriteMyFile_" + i, TextIO.write().to("gs://bucket/object"));
+    }
+
+    // Enable the FileSystems API to know about gs:// URIs in this test.
+    FileSystems.setDefaultPipelineOptions(options);
+
+    return p;
+  }
+
   private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs 
mockJobs)
       throws IOException {
     Dataflow mockDataflowClient = mock(Dataflow.class);
@@ -824,6 +840,24 @@ public class DataflowRunnerTest implements Serializable {
             .startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
   }
 
+  /** Test for automatically using upload_graph when the job graph is too 
large (>10MB). */
+  @Test
+  public void testUploadGraphWithAutoUpload() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    Pipeline p = buildDataflowPipelineWithLargeGraph(options);
+    p.run();
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), 
jobCaptor.capture());
+    assertValidJob(jobCaptor.getValue());
+    assertTrue(jobCaptor.getValue().getSteps().isEmpty());
+    assertTrue(
+        jobCaptor
+            .getValue()
+            .getStepsLocation()
+            .startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
+  }
+
   @Test
   public void testUpdateNonExistentPipeline() throws IOException {
     thrown.expect(IllegalArgumentException.class);

Reply via email to