damccorm commented on a change in pull request #16903:
URL: https://github.com/apache/beam/pull/16903#discussion_r810349831



##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -159,8 +159,59 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                panic("Beam has not been initialized. Call beam.Init() before 
pipeline construction.")
        }
 
-       // (1) Gather job options
+       beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
+       opts, err := getJobOptions(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       // (1) Build and submit
+       // NOTE(herohde) 10/8/2018: the last segment of the names must be 
"worker" and "dataflow-worker.jar".
+       id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), 
time.Now().UnixNano())
+
+       modelURL := gcsx.Join(*stagingLocation, id, "model")
+       workerURL := gcsx.Join(*stagingLocation, id, "worker")
+       jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+       xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
+
+       edges, _, err := p.Build()
+       if err != nil {
+               return nil, err
+       }
+       artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, 
opts.Project, xlangURL)
+       if err != nil {
+               return nil, errors.WithContext(err, "resolving cross-language 
artifacts")
+       }
+       opts.ArtifactURLs = artifactURLs
+       environment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+       if err != nil {
+               return nil, errors.WithContext(err, "creating environment for 
model pipeline")
+       }
+       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
environment})
+       if err != nil {
+               return nil, errors.WithContext(err, "generating model pipeline")
+       }
+       err = pipelinex.ApplySdkImageOverrides(model, 
jobopts.GetSdkImageOverrides())
+       if err != nil {
+               return nil, errors.WithContext(err, "applying container image 
overrides")
+       }
+
+       if *dryRun {
+               log.Info(ctx, "Dry-run: not submitting job!")
+
+               log.Info(ctx, proto.MarshalTextString(model))
+               job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
jarURL, modelURL)
+               if err != nil {
+                       return nil, err
+               }
+               dataflowlib.PrintJob(ctx, job)
+               return nil, nil
+       }
 
+       return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, 
modelURL, *endpoint, *executeAsync)
+}
+
+func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {

Review comment:
       This is definitely another L for github's diffs 😢 but basically I just 
moved a block of code from the top of this green block into this helper 
function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to