[ 
https://issues.apache.org/jira/browse/BEAM-6113?focusedWorklogId=169558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169558
 ]

ASF GitHub Bot logged work on BEAM-6113:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/18 23:25
            Start Date: 26/Nov/18 23:25
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #7120: [BEAM-6113] Small 
changes to avoid user errors around beam.Init()
URL: https://github.com/apache/beam/pull/7120
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go 
b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
index 628429368b00..5fe6acce9471 100644
--- a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
+++ b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
@@ -53,6 +53,9 @@ import (
 var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
 
 func main() {
+       // beam.Init() is an initialization hook that must be called on startup.
+       beam.Init()
+
        // Create the Pipeline object and root scope.
        p := beam.NewPipeline()
        s := p.Root()
diff --git a/sdks/go/examples/wordcount/wordcount.go 
b/sdks/go/examples/wordcount/wordcount.go
index f28d1254d427..fabaa601f13d 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -151,7 +151,7 @@ func CountWords(s beam.Scope, lines beam.PCollection) 
beam.PCollection {
 func main() {
        // If beamx or Go flags are used, flags must be parsed first.
        flag.Parse()
-       // beam.Init() is an initialization hook that must called on startup. On
+       // beam.Init() is an initialization hook that must be called on 
startup. On
        // distributed runners, it is used to intercept control.
        beam.Init()
 
diff --git a/sdks/go/pkg/beam/core/runtime/init.go 
b/sdks/go/pkg/beam/core/runtime/init.go
index db9850f95931..3a1e281f1569 100644
--- a/sdks/go/pkg/beam/core/runtime/init.go
+++ b/sdks/go/pkg/beam/core/runtime/init.go
@@ -42,3 +42,8 @@ func Init() {
                hook()
        }
 }
+
+// Expose the initialization status for runners.
+func Initialized() bool {
+       return initialized
+}
diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go
index c0b2ba02fd90..77d7e4b183cf 100644
--- a/sdks/go/pkg/beam/forward.go
+++ b/sdks/go/pkg/beam/forward.go
@@ -64,6 +64,11 @@ func Init() {
        runtime.Init()
 }
 
+// Expose the initialization status for runners.
+func Initialized() bool {
+    return runtime.Initialized()
+}
+
 // PipelineOptions are global options for the active pipeline. Options can
 // be defined any time before execution and are re-created by the harness on
 // remote execution workers. Global options should be used sparingly.
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index 1d50aadcd402..f45639123483 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -36,6 +36,12 @@ func init() {
 
 // Execute runs the pipeline in-process.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
+       log.Info(ctx, "Executing pipeline with the direct runner.")
+
+       if !beam.Initialized() {
+               log.Warn(ctx, "Beam has not been initialized. Call beam.Init() 
before pipeline construction.")
+       }
+
        log.Info(ctx, "Pipeline:")
        log.Info(ctx, p)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169558)
    Time Spent: 1h  (was: 50m)

> Streamline beam.Init() requirement for Go pipelines.
> ----------------------------------------------------
>
>                 Key: BEAM-6113
>                 URL: https://issues.apache.org/jira/browse/BEAM-6113
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: Minor
>              Labels: documentation, usability
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> When writing a Go pipeline it's necessary to call beam.Init() at the 
> beginning of your code, but right now there is nearly no validation or 
> checking around that. This makes it very easy for a new user of Beam Go to 
> accidentally leave it out and not get a clear signal for why their pipeline 
> isn't being executed as expected. This issue is for tracking progress towards 
> streamlining this requirement to improve the user experience.
> In the short term this can be easily improved by improving documentation and 
> examples to clearly communicate the necessity of beam.Init(), and also add 
> log warnings to provide some signal if the user forgot to call it.
> In the long term, the best solution to this would be to try avoiding the 
> problem all together (for example by removing the need for beam.Init(), or 
> having it called implicitly without relying on users to call it), or to 
> create a stronger failure state, for example by having pipelines without 
> beam.Init() immediately break with a clear message notifying the user that it 
> was not called.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to