[ 
https://issues.apache.org/jira/browse/BEAM-13912?focusedWorklogId=729878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-729878
 ]

ASF GitHub Bot logged work on BEAM-13912:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Feb/22 21:26
            Start Date: 18/Feb/22 21:26
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on a change in pull request #16903:
URL: https://github.com/apache/beam/pull/16903#discussion_r810349831



##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -159,8 +159,59 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                panic("Beam has not been initialized. Call beam.Init() before 
pipeline construction.")
        }
 
-       // (1) Gather job options
+       beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
+       opts, err := getJobOptions(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       // (1) Build and submit
+       // NOTE(herohde) 10/8/2018: the last segment of the names must be 
"worker" and "dataflow-worker.jar".
+       id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), 
time.Now().UnixNano())
+
+       modelURL := gcsx.Join(*stagingLocation, id, "model")
+       workerURL := gcsx.Join(*stagingLocation, id, "worker")
+       jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+       xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
+
+       edges, _, err := p.Build()
+       if err != nil {
+               return nil, err
+       }
+       artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, 
opts.Project, xlangURL)
+       if err != nil {
+               return nil, errors.WithContext(err, "resolving cross-language 
artifacts")
+       }
+       opts.ArtifactURLs = artifactURLs
+       environment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+       if err != nil {
+               return nil, errors.WithContext(err, "creating environment for 
model pipeline")
+       }
+       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
environment})
+       if err != nil {
+               return nil, errors.WithContext(err, "generating model pipeline")
+       }
+       err = pipelinex.ApplySdkImageOverrides(model, 
jobopts.GetSdkImageOverrides())
+       if err != nil {
+               return nil, errors.WithContext(err, "applying container image 
overrides")
+       }
+
+       if *dryRun {
+               log.Info(ctx, "Dry-run: not submitting job!")
+
+               log.Info(ctx, proto.MarshalTextString(model))
+               job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
jarURL, modelURL)
+               if err != nil {
+                       return nil, err
+               }
+               dataflowlib.PrintJob(ctx, job)
+               return nil, nil
+       }
 
+       return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, 
modelURL, *endpoint, *executeAsync)
+}
+
+func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {

Review comment:
       This is definitely another L for github's diffs 😢 but basically I just 
moved a block of code from the top of this green block into this helper 
function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 729878)
    Time Spent: 20m  (was: 10m)

> Increase unit testing coverage in the dataflow package
> ------------------------------------------------------
>
>                 Key: BEAM-13912
>                 URL: https://issues.apache.org/jira/browse/BEAM-13912
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Increase unit testing coverage in the [dataflow runner 
> package|https://github.com/apache/beam/tree/release-2.36.0/sdks/go/pkg/beam/runners/dataflow]
> We want code coverage at or above 50%, it is currently at 4.7%.
> This task will largely focus on writing test cases for the helper functions 
> in the package.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to