damccorm commented on a change in pull request #16903:
URL: https://github.com/apache/beam/pull/16903#discussion_r810349831
##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -159,8 +159,59 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
panic("Beam has not been initialized. Call beam.Init() before
pipeline construction.")
}
- // (1) Gather job options
+ beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
+ opts, err := getJobOptions(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ // (1) Build and submit
+ // NOTE(herohde) 10/8/2018: the last segment of the names must be
"worker" and "dataflow-worker.jar".
+ id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1),
time.Now().UnixNano())
+
+ modelURL := gcsx.Join(*stagingLocation, id, "model")
+ workerURL := gcsx.Join(*stagingLocation, id, "worker")
+ jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+ xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
+
+ edges, _, err := p.Build()
+ if err != nil {
+ return nil, err
+ }
+ artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges,
opts.Project, xlangURL)
+ if err != nil {
+ return nil, errors.WithContext(err, "resolving cross-language
artifacts")
+ }
+ opts.ArtifactURLs = artifactURLs
+ environment, err := graphx.CreateEnvironment(ctx,
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+ if err != nil {
+ return nil, errors.WithContext(err, "creating environment for
model pipeline")
+ }
+ model, err := graphx.Marshal(edges, &graphx.Options{Environment:
environment})
+ if err != nil {
+ return nil, errors.WithContext(err, "generating model pipeline")
+ }
+ err = pipelinex.ApplySdkImageOverrides(model,
jobopts.GetSdkImageOverrides())
+ if err != nil {
+ return nil, errors.WithContext(err, "applying container image
overrides")
+ }
+
+ if *dryRun {
+ log.Info(ctx, "Dry-run: not submitting job!")
+
+ log.Info(ctx, proto.MarshalTextString(model))
+ job, err := dataflowlib.Translate(ctx, model, opts, workerURL,
jarURL, modelURL)
+ if err != nil {
+ return nil, err
+ }
+ dataflowlib.PrintJob(ctx, job)
+ return nil, nil
+ }
+ return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL,
modelURL, *endpoint, *executeAsync)
+}
+
+func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
Review comment:
This is definitely another L for github's diffs 😢 but basically I just
moved a block of code from the top of this green block into this helper
function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]