[ 
https://issues.apache.org/jira/browse/BEAM-6049?focusedWorklogId=166997&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166997
 ]

ASF GitHub Bot logged work on BEAM-6049:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Nov/18 18:33
            Start Date: 16/Nov/18 18:33
    Worklog Time Spent: 10m 
      Work Description: swegner closed pull request #7047: [BEAM-6049] Add 
option to load job to GCS in Dataflow Runner
URL: https://github.com/apache/beam/pull/7047
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index ff1d38b859f..2b088b34c07 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -369,7 +369,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev402-$google_clients_version",
         google_api_services_clouddebugger           : 
"com.google.apis:google-api-services-clouddebugger:v2-rev253-$google_clients_version",
         google_api_services_cloudresourcemanager    : 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev502-$google_clients_version",
-        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev257-$google_clients_version",
+        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev266-$google_clients_version",
         google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev399-$google_clients_version",
         google_api_services_storage                 : 
"com.google.apis:google-api-services-storage:v1-rev136-$google_clients_version",
         google_auth_library_credentials             : 
"com.google.auth:google-auth-library-credentials:$google_auth_version",
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 680c348967d..fe3810d42db 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -196,6 +196,7 @@
   @VisibleForTesting static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 
1024 * 1024;
 
   @VisibleForTesting static final String PIPELINE_FILE_NAME = "pipeline.pb";
+  @VisibleForTesting static final String DATAFLOW_GRAPH_FILE_NAME = 
"dataflow_graph.pb";
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
@@ -860,6 +861,20 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       newJob.setTransformNameMapping(options.getTransformNameMapping());
       newJob.setReplaceJobId(jobIdToUpdate);
     }
+
+    // Upload the job to GCS and remove the graph object from the API call.  
The graph
+    // will be downloaded from GCS by the service.
+    if (hasExperiment(options, "upload_graph")) {
+      DataflowPackage stagedGraph =
+          options
+              .getStager()
+              .stageToFile(
+                  
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
+                  DATAFLOW_GRAPH_FILE_NAME);
+      newJob.getSteps().clear();
+      newJob.setStepsLocation(stagedGraph.getLocation());
+    }
+
     Job jobResult;
     try {
       jobResult = dataflowClient.createJob(newJob);
@@ -869,9 +884,9 @@ public DataflowPipelineJob run(Pipeline pipeline) {
         if (Utf8.encodedLength(newJob.toString()) >= 
CREATE_JOB_REQUEST_LIMIT_BYTES) {
           errorMessages =
               "The size of the serialized JSON representation of the pipeline "
-                  + "exceeds the allowable limit. "
-                  + "For more information, please check the FAQ link below:\n"
-                  + "https://cloud.google.com/dataflow/faq";;
+                  + "exceeds the allowable limit for the API. Use experiment "
+                  + "'upload_graph' (--experiments=upload_graph) to direct the 
runner to "
+                  + "upload the JSON to your GCS staging bucket instead of 
embedding in the API request.";
         } else {
           errorMessages = e.getDetails().getMessage();
         }
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 43a33838724..b9d01e563d1 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -602,6 +602,24 @@ public void testUpdate() throws IOException {
     assertValidJob(jobCaptor.getValue());
   }
 
+  @Test
+  public void testUploadGraph() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(Arrays.asList("upload_graph"));
+    Pipeline p = buildDataflowPipeline(options);
+    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), 
jobCaptor.capture());
+    assertValidJob(jobCaptor.getValue());
+    assertTrue(jobCaptor.getValue().getSteps().isEmpty());
+    assertTrue(
+        jobCaptor
+            .getValue()
+            .getStepsLocation()
+            .startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
+  }
+
   @Test
   public void testUpdateNonExistentPipeline() throws IOException {
     thrown.expect(IllegalArgumentException.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 166997)
    Time Spent: 2h 40m  (was: 2.5h)

> Support Job / Graph upload to GCS in Dataflow Runner
> ----------------------------------------------------
>
>                 Key: BEAM-6049
>                 URL: https://issues.apache.org/jira/browse/BEAM-6049
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Andrea Foegler
>            Assignee: Andrea Foegler
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add uploadGraph flag to support uploading the job / graph to GCS instead of 
> embedding in the request.  This change allows the API to support much larger 
> graphs than the embedded format of CreateJob.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to