[ 
https://issues.apache.org/jira/browse/BEAM-2083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283021#comment-16283021
 ] 

ASF GitHub Bot commented on BEAM-2083:
--------------------------------------

kennknowles closed pull request #4230: [BEAM-2083] Make Scope a value type in 
Go SDK
URL: https://github.com/apache/beam/pull/4230
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/examples/contains/contains.go 
b/sdks/go/examples/contains/contains.go
index 7bdfb10b82c..45a8527a851 100644
--- a/sdks/go/examples/contains/contains.go
+++ b/sdks/go/examples/contains/contains.go
@@ -42,7 +42,7 @@ func init() {
 }
 
 // FilterWords returns PCollection<KV<word,count>> with (up to) 10 matching 
words.
-func FilterWords(s *beam.Scope, lines beam.PCollection) beam.PCollection {
+func FilterWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
        s = s.Scope("FilterWords")
        words := beam.ParDo(s, extractFn, lines)
        filtered := beam.ParDo(s, &includeFn{Search: *search}, words)
diff --git a/sdks/go/examples/cookbook/combine/combine.go 
b/sdks/go/examples/cookbook/combine/combine.go
index 5d9fe77de98..7e24aa1fb30 100644
--- a/sdks/go/examples/cookbook/combine/combine.go
+++ b/sdks/go/examples/cookbook/combine/combine.go
@@ -50,7 +50,7 @@ type PlaysRow struct {
 // PlaysForWords generates a string containing the list of play names
 // in which that word appears. It takes a PCollection<WordRow> and
 // returns a PCollection<PlaysRow>.
-func PlaysForWords(s *beam.Scope, rows beam.PCollection) beam.PCollection {
+func PlaysForWords(s beam.Scope, rows beam.PCollection) beam.PCollection {
        s = s.Scope("PlaysForWords")
 
        words := beam.ParDo(s, &extractFn{MinLength: *minLength}, rows)
diff --git a/sdks/go/examples/cookbook/filter/filter.go 
b/sdks/go/examples/cookbook/filter/filter.go
index 66998c4509a..effc1225655 100644
--- a/sdks/go/examples/cookbook/filter/filter.go
+++ b/sdks/go/examples/cookbook/filter/filter.go
@@ -45,7 +45,7 @@ type WeatherDataRow struct {
 
 // BelowGlobalMean computes the rows for the given month below the global 
mean. It takes a
 // PCollection<WeatherDataRow> and returns a PCollection<WeatherDataRow>.
-func BelowGlobalMean(s *beam.Scope, m int, rows beam.PCollection) 
beam.PCollection {
+func BelowGlobalMean(s beam.Scope, m int, rows beam.PCollection) 
beam.PCollection {
        s = s.Scope("BelowGlobalMean")
 
        // Find the global mean of all the mean_temp readings in the weather 
data.
diff --git a/sdks/go/examples/cookbook/max/max.go 
b/sdks/go/examples/cookbook/max/max.go
index eb7e07dec63..f0184354afa 100644
--- a/sdks/go/examples/cookbook/max/max.go
+++ b/sdks/go/examples/cookbook/max/max.go
@@ -47,7 +47,7 @@ type MaxMeanTempRow struct {
 
 // MaxMeanTemp finds the max mean_temp for each month. It takes a
 // PCollection<WeatherDataRow> and returns a PCollection<MaxMeanTempRow>.
-func MaxMeanTemp(s *beam.Scope, rows beam.PCollection) beam.PCollection {
+func MaxMeanTemp(s beam.Scope, rows beam.PCollection) beam.PCollection {
        s = s.Scope("MaxMeanTemp")
 
        keyed := beam.ParDo(s, extractFn, rows)
diff --git a/sdks/go/examples/cookbook/tornadoes/tornadoes.go 
b/sdks/go/examples/cookbook/tornadoes/tornadoes.go
index d46ad60bb49..058948ba32e 100644
--- a/sdks/go/examples/cookbook/tornadoes/tornadoes.go
+++ b/sdks/go/examples/cookbook/tornadoes/tornadoes.go
@@ -71,7 +71,7 @@ type TornadoRow struct {
 
 // CountTornadoes computes the number of tornadoes pr month. It takes a
 // PCollection<WeatherDataRow> and returns a PCollection<TornadoRow>.
-func CountTornadoes(s *beam.Scope, rows beam.PCollection) beam.PCollection {
+func CountTornadoes(s beam.Scope, rows beam.PCollection) beam.PCollection {
        s = s.Scope("CountTornadoes")
 
        // row... => month...
diff --git a/sdks/go/examples/debugging_wordcount/debugging_wordcount.go 
b/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
index 17b7ccacc2c..a17a75f3104 100644
--- a/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
+++ b/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
@@ -122,7 +122,7 @@ func formatFn(w string, c int) string {
 // CountWords is a composite transform that counts the words of an PCollection
 // of lines. It expects a PCollection of type string and returns a PCollection
 // of type KV<string,int>.
-func CountWords(s *beam.Scope, lines beam.PCollection) beam.PCollection {
+func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
        s = s.Scope("CountWords")
        col := beam.ParDo(s, extractFn, lines)
        return stats.Count(s, col)
diff --git a/sdks/go/examples/forest/forest.go 
b/sdks/go/examples/forest/forest.go
index af61ce64230..06ae7831a45 100644
--- a/sdks/go/examples/forest/forest.go
+++ b/sdks/go/examples/forest/forest.go
@@ -42,7 +42,7 @@ var (
        depth = flag.Int("depth", 3, "Depth of each tree")
 )
 
-func tree(s *beam.Scope, depth int) beam.PCollection {
+func tree(s beam.Scope, depth int) beam.PCollection {
        if depth <= 0 {
                return leaf(s)
        }
@@ -54,7 +54,7 @@ func tree(s *beam.Scope, depth int) beam.PCollection {
 
 var count = 0
 
-func leaf(s *beam.Scope) beam.PCollection {
+func leaf(s beam.Scope) beam.PCollection {
        count++
        return beam.Create(s, count) // singleton PCollection<int>
 }
diff --git a/sdks/go/examples/pingpong/pingpong.go 
b/sdks/go/examples/pingpong/pingpong.go
index eb248df73c6..8c0bfe46420 100644
--- a/sdks/go/examples/pingpong/pingpong.go
+++ b/sdks/go/examples/pingpong/pingpong.go
@@ -36,7 +36,7 @@ var (
 
 // stitch constructs two composite PTranformations that provide input to each 
other. It
 // is a (deliberately) complex DAG to show what kind of structures are 
possible.
-func stitch(s *beam.Scope, words beam.PCollection) (beam.PCollection, 
beam.PCollection) {
+func stitch(s beam.Scope, words beam.PCollection) (beam.PCollection, 
beam.PCollection) {
        ping := s.Scope("ping")
        pong := ping // s.Scope("pong")
 
@@ -74,7 +74,7 @@ func multiFn(word string, sample []string, small, big 
func(string)) error {
        return nil
 }
 
-func subset(s *beam.Scope, a, b beam.PCollection) {
+func subset(s beam.Scope, a, b beam.PCollection) {
        beam.ParDo0(s, subsetFn, beam.Impulse(s), beam.SideInput{Input: a}, 
beam.SideInput{Input: b})
 }
 
diff --git a/sdks/go/examples/wordcount/wordcount.go 
b/sdks/go/examples/wordcount/wordcount.go
index d7151717b4f..cd513a48cff 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -129,7 +129,7 @@ func formatFn(w string, c int) string {
 // of lines. It expects a PCollection of type string and returns a PCollection
 // of type KV<string,int>. The Beam type checker enforces these constraints
 // during pipeline construction.
-func CountWords(s *beam.Scope, lines beam.PCollection) beam.PCollection {
+func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
        s = s.Scope("CountWords")
 
        // Convert lines of text into individual words.
diff --git a/sdks/go/examples/yatzy/yatzy.go b/sdks/go/examples/yatzy/yatzy.go
index c15295d39b1..07117e57370 100644
--- a/sdks/go/examples/yatzy/yatzy.go
+++ b/sdks/go/examples/yatzy/yatzy.go
@@ -48,7 +48,7 @@ func init() {
 //     0 -> \x.x+1 -> \x.x+1 -> (N times) -> \x.min(x, 6)
 //
 // The single output will be a number between 1 and 6.
-func roll(ctx context.Context, s *beam.Scope) beam.PCollection {
+func roll(ctx context.Context, s beam.Scope) beam.PCollection {
        num := rand.Intn(*real) + 1
        log.Debugf(ctx, "Lucky number %v!", num)
 
diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go
index 42adec4f94d..8ae46c4c3a7 100644
--- a/sdks/go/pkg/beam/combine.go
+++ b/sdks/go/pkg/beam/combine.go
@@ -23,21 +23,21 @@ import (
 
 // Combine inserts a global Combine transform into the pipeline. It
 // expects a PCollection<T> as input where T is a concrete type.
-func Combine(s *Scope, combinefn interface{}, col PCollection, opts ...Option) 
PCollection {
+func Combine(s Scope, combinefn interface{}, col PCollection, opts ...Option) 
PCollection {
        return Must(TryCombine(s, combinefn, col, opts...))
 }
 
 // CombinePerKey inserts a GBK and per-key Combine transform into the 
pipeline. It
 // expects a PCollection<KV<K,T>>. The CombineFn may optionally take a key 
parameter.
-func CombinePerKey(s *Scope, combinefn interface{}, col PCollection, opts 
...Option) PCollection {
+func CombinePerKey(s Scope, combinefn interface{}, col PCollection, opts 
...Option) PCollection {
        return Must(TryCombinePerKey(s, combinefn, col, opts...))
 }
 
 // TryCombine attempts to insert a global Combine transform into the pipeline. 
It may fail
 // for multiple reasons, notably that the combinefn is not valid or cannot be 
bound
 // -- due to type mismatch, say -- to the incoming PCollections.
-func TryCombine(s *Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
-       side, _, err := validate(col, opts)
+func TryCombine(s Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
+       side, _, err := validate(s, col, opts)
        if err != nil {
                return PCollection{}, err
        }
@@ -48,8 +48,8 @@ func TryCombine(s *Scope, combinefn interface{}, col 
PCollection, opts ...Option
 // TryCombinePerKey attempts to insert a per-key Combine transform into the 
pipeline. It may fail
 // for multiple reasons, notably that the combinefn is not valid or cannot be 
bound
 // -- due to type mismatch, say -- to the incoming PCollections.
-func TryCombinePerKey(s *Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
-       side, _, err := validate(col, opts)
+func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
+       side, _, err := validate(s, col, opts)
        if err != nil {
                return PCollection{}, err
        }
@@ -61,7 +61,7 @@ func TryCombinePerKey(s *Scope, combinefn interface{}, col 
PCollection, opts ...
        return combine(s, combinefn, col, side)
 }
 
-func combine(s *Scope, combinefn interface{}, col PCollection, side 
[]SideInput) (PCollection, error) {
+func combine(s Scope, combinefn interface{}, col PCollection, side 
[]SideInput) (PCollection, error) {
        fn, err := graph.NewCombineFn(combinefn)
        if err != nil {
                return PCollection{}, fmt.Errorf("invalid CombineFn: %v", err)
diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go
index 08302aaea42..6c190e33a1a 100644
--- a/sdks/go/pkg/beam/create.go
+++ b/sdks/go/pkg/beam/create.go
@@ -36,7 +36,7 @@ func init() {
 // The returned PCollections can be used as any other PCollections. The values
 // are JSON-coded. Each runner may place limits on the sizes of the values and
 // Create should generally only be used for small collections.
-func Create(s *Scope, values ...interface{}) PCollection {
+func Create(s Scope, values ...interface{}) PCollection {
        return Must(TryCreate(s, values...))
 }
 
@@ -45,7 +45,7 @@ func Create(s *Scope, values ...interface{}) PCollection {
 //
 //    list := []string{"a", "b", "c"}
 //    foo := beam.CreateList(s, list)  // foo : W<string>
-func CreateList(s *Scope, list interface{}) PCollection {
+func CreateList(s Scope, list interface{}) PCollection {
        var ret []interface{}
        val := reflect.ValueOf(list)
        if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
@@ -59,7 +59,7 @@ func CreateList(s *Scope, list interface{}) PCollection {
 
 // TryCreate inserts a fixed set of values into the pipeline. The values must
 // be of the same type.
-func TryCreate(s *Scope, values ...interface{}) (PCollection, error) {
+func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
        if len(values) == 0 {
                return PCollection{}, fmt.Errorf("create has no values")
        }
diff --git a/sdks/go/pkg/beam/external.go b/sdks/go/pkg/beam/external.go
index dfda638b90d..a7bab8f3ea1 100644
--- a/sdks/go/pkg/beam/external.go
+++ b/sdks/go/pkg/beam/external.go
@@ -29,14 +29,14 @@ import (
 // spec provided to implement the behavior of the operation. Transform
 // libraries should expose an API that captures the user's intent and serialize
 // the payload as a byte slice that the runner will deserialize.
-func External(s *Scope, spec string, payload []byte, in []PCollection, out 
[]reflect.Type) []PCollection {
+func External(s Scope, spec string, payload []byte, in []PCollection, out 
[]reflect.Type) []PCollection {
        return MustN(TryExternal(s, spec, payload, in, out))
 }
 
 // TryExternal attempts to perform the work of External, returning an error 
indicating why the operation
 // failed. Failure reasons include the use of side inputs, or an external 
transform that has both inputs
 // and outputs.
-func TryExternal(s *Scope, spec string, payload []byte, in []PCollection, out 
[]reflect.Type) ([]PCollection, error) {
+func TryExternal(s Scope, spec string, payload []byte, in []PCollection, out 
[]reflect.Type) ([]PCollection, error) {
        switch {
        case len(in) == 0 && len(out) == 0:
                return []PCollection{}, fmt.Errorf("External node not 
well-formed: out and in both empty")
@@ -61,7 +61,10 @@ func TryExternal(s *Scope, spec string, payload []byte, in 
[]PCollection, out []
 // primitive. Runners depending on this coding do so AT THEIR OWN RISK and 
will be broken when we convert
 // this implementation to its final internal representation.
 
-func tryExternalSource(s *Scope, spec string, payload []byte, out 
reflect.Type) ([]PCollection, error) {
+func tryExternalSource(s Scope, spec string, payload []byte, out reflect.Type) 
([]PCollection, error) {
+       if !s.IsValid() {
+               return nil, fmt.Errorf("invalid scope")
+       }
        emit := reflect.FuncOf([]reflect.Type{out}, nil, false)
        fnT := reflect.FuncOf([]reflect.Type{emit}, 
[]reflect.Type{reflectx.Error}, false)
 
@@ -81,7 +84,10 @@ func tryExternalSource(s *Scope, spec string, payload 
[]byte, out reflect.Type)
        return []PCollection{ret}, nil
 }
 
-func tryExternalSink(s *Scope, in PCollection, spec string, payload []byte) 
([]PCollection, error) {
+func tryExternalSink(s Scope, in PCollection, spec string, payload []byte) 
([]PCollection, error) {
+       if !s.IsValid() {
+               return nil, fmt.Errorf("invalid scope")
+       }
        if !in.IsValid() {
                return []PCollection{}, fmt.Errorf("invalid main pcollection")
        }
diff --git a/sdks/go/pkg/beam/flatten.go b/sdks/go/pkg/beam/flatten.go
index e46cc63a25c..6baaec0d22b 100644
--- a/sdks/go/pkg/beam/flatten.go
+++ b/sdks/go/pkg/beam/flatten.go
@@ -35,14 +35,17 @@ import (
 //
 // By default, the Coder of the output PCollection is the same as the Coder
 // of the first PCollection.
-func Flatten(s *Scope, cols ...PCollection) PCollection {
+func Flatten(s Scope, cols ...PCollection) PCollection {
        return Must(TryFlatten(s, cols...))
 }
 
 // TryFlatten merges incoming PCollections of type 'A' to a single PCollection
 // of type 'A'. Returns an error indicating the set of PCollections that could
 // not be flattened.
-func TryFlatten(s *Scope, cols ...PCollection) (PCollection, error) {
+func TryFlatten(s Scope, cols ...PCollection) (PCollection, error) {
+       if !s.IsValid() {
+               return PCollection{}, fmt.Errorf("invalid scope")
+       }
        for i, in := range cols {
                if !in.IsValid() {
                        return PCollection{}, fmt.Errorf("invalid pcollection 
to flatten: index %v", i)
diff --git a/sdks/go/pkg/beam/gbk.go b/sdks/go/pkg/beam/gbk.go
index 99786ed1259..3c11bea1e20 100644
--- a/sdks/go/pkg/beam/gbk.go
+++ b/sdks/go/pkg/beam/gbk.go
@@ -54,7 +54,7 @@ import (
 //
 // See CoGroupByKey for a way to group multiple input PCollections by a common
 // key at once.
-func GroupByKey(s *Scope, a PCollection) PCollection {
+func GroupByKey(s Scope, a PCollection) PCollection {
        return Must(TryGroupByKey(s, a))
 }
 
@@ -63,7 +63,10 @@ func GroupByKey(s *Scope, a PCollection) PCollection {
 
 // TryGroupByKey inserts a GBK transform into the pipeline. Returns
 // an error on failure.
-func TryGroupByKey(s *Scope, a PCollection) (PCollection, error) {
+func TryGroupByKey(s Scope, a PCollection) (PCollection, error) {
+       if !s.IsValid() {
+               return PCollection{}, fmt.Errorf("invalid scope")
+       }
        if !a.IsValid() {
                return PCollection{}, fmt.Errorf("invalid pcollection to GBK")
        }
@@ -77,12 +80,12 @@ func TryGroupByKey(s *Scope, a PCollection) (PCollection, 
error) {
 }
 
 // CoGroupByKey inserts a CoGBK transform into the pipeline.
-func CoGroupByKey(s *Scope, cols ...PCollection) PCollection {
+func CoGroupByKey(s Scope, cols ...PCollection) PCollection {
        return Must(TryCoGroupByKey(s, cols...))
 }
 
 // TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returns
 // an error on failure.
-func TryCoGroupByKey(s *Scope, cols ...PCollection) (PCollection, error) {
+func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) {
        panic("NYI")
 }
diff --git a/sdks/go/pkg/beam/impulse.go b/sdks/go/pkg/beam/impulse.go
index 07c3057eb4c..df44d673b1a 100644
--- a/sdks/go/pkg/beam/impulse.go
+++ b/sdks/go/pkg/beam/impulse.go
@@ -26,7 +26,7 @@ import (
 //
 // The purpose of Impulse is to trigger another transform, such as
 // ones that take all information as side inputs.
-func Impulse(s *Scope) PCollection {
+func Impulse(s Scope) PCollection {
        return ImpulseValue(s, []byte{})
 }
 
@@ -35,7 +35,10 @@ func Impulse(s *Scope) PCollection {
 //
 //   foo := beam.ImpulseValue(s, []byte{})  // foo : W<[]byte>
 //
-func ImpulseValue(s *Scope, value []byte) PCollection {
+func ImpulseValue(s Scope, value []byte) PCollection {
+       if !s.IsValid() {
+               panic("Invalid scope")
+       }
        edge := graph.NewImpulse(s.real, s.scope, value)
        return PCollection{edge.Output[0].To}
 }
diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go 
b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
index 1eff568055b..1796e1dbecf 100644
--- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
@@ -71,7 +71,7 @@ func NewQualifiedTableName(s string) (QualifiedTableName, 
error) {
 // Read reads all rows from the given table. The table must have a schema
 // compatible with the given type, t, and Read returns a PCollection<t>. If the
 // table has more rows than t, then Read is implicitly a projection.
-func Read(s *beam.Scope, project, table string, t reflect.Type) 
beam.PCollection {
+func Read(s beam.Scope, project, table string, t reflect.Type) 
beam.PCollection {
        mustParseTable(table)
 
        s = s.Scope("bigquery.Read")
@@ -83,12 +83,12 @@ func Read(s *beam.Scope, project, table string, t 
reflect.Type) beam.PCollection
 
 // Query executes a query. The output must have a schema compatible with the 
given
 // type, t. It returns a PCollection<t>.
-func Query(s *beam.Scope, project, q string, t reflect.Type) beam.PCollection {
+func Query(s beam.Scope, project, q string, t reflect.Type) beam.PCollection {
        s = s.Scope("bigquery.Query")
        return query(s, project, q, t)
 }
 
-func query(s *beam.Scope, project, query string, t reflect.Type) 
beam.PCollection {
+func query(s beam.Scope, project, query string, t reflect.Type) 
beam.PCollection {
        mustInferSchema(t)
 
        imp := beam.Impulse(s)
@@ -155,7 +155,7 @@ func mustParseTable(table string) QualifiedTableName {
 
 // Write writes the elements of the given PCollection<T> to bigquery. T is 
required
 // to be the schema type.
-func Write(s *beam.Scope, project, table string, col beam.PCollection) {
+func Write(s beam.Scope, project, table string, col beam.PCollection) {
        t := typex.SkipW(col.Type()).Type()
        mustInferSchema(t)
        qn := mustParseTable(table)
diff --git a/sdks/go/pkg/beam/io/textio/textio.go 
b/sdks/go/pkg/beam/io/textio/textio.go
index fb6f21217eb..000e873dcb7 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -34,7 +34,7 @@ func init() {
 
 // Read reads a set of file and returns the lines as a PCollection<string>. The
 // newlines are not part of the lines.
-func Read(s *beam.Scope, glob string) beam.PCollection {
+func Read(s beam.Scope, glob string) beam.PCollection {
        s = s.Scope("textio.Read")
 
        validateScheme(glob)
@@ -70,13 +70,13 @@ func newFileSystem(ctx context.Context, glob string) 
(FileSystem, error) {
 // ReadAll expands and reads the filename given as globs by the incoming
 // PCollection<string>. It returns the lines of all files as a single
 // PCollection<string>. The newlines are not part of the lines.
-func ReadAll(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("textio.ReadAll")
 
        return read(s, col)
 }
 
-func read(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func read(s beam.Scope, col beam.PCollection) beam.PCollection {
        files := beam.ParDo(s, expandFn, col)
        return beam.ParDo(s, readFn, files)
 }
@@ -129,7 +129,7 @@ func readFn(ctx context.Context, filename string, emit 
func(string)) error {
 
 // Write writes a PCollection<string> to a file as separate lines. The
 // writer add a newline after each element.
-func Write(s *beam.Scope, filename string, col beam.PCollection) {
+func Write(s beam.Scope, filename string, col beam.PCollection) {
        s = s.Scope("textio.Write")
 
        validateScheme(filename)
@@ -182,7 +182,7 @@ func (w *writeFileFn) Teardown() error {
 
 // Immediate reads a local file at pipeline construction-time and embeds the
 // data into a I/O-free pipeline source. Should be used for small files only.
-func Immediate(s *beam.Scope, filename string) (beam.PCollection, error) {
+func Immediate(s beam.Scope, filename string) (beam.PCollection, error) {
        s = s.Scope("textio.Immediate")
 
        var data []interface{}
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index 7c64c22dadc..fa771572366 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -24,8 +24,8 @@ import (
 // TryParDo attempts to insert a ParDo transform into the pipeline. It may fail
 // for multiple reasons, notably that the dofn is not valid or cannot be bound
 // -- due to type mismatch, say -- to the incoming PCollections.
-func TryParDo(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
([]PCollection, error) {
-       side, typedefs, err := validate(col, opts)
+func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) 
([]PCollection, error) {
+       side, typedefs, err := validate(s, col, opts)
        if err != nil {
                return nil, err
        }
@@ -54,12 +54,12 @@ func TryParDo(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PC
 }
 
 // ParDoN inserts a ParDo with any number of outputs into the pipeline.
-func ParDoN(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
[]PCollection {
+func ParDoN(s Scope, dofn interface{}, col PCollection, opts ...Option) 
[]PCollection {
        return MustN(TryParDo(s, dofn, col, opts...))
 }
 
 // ParDo0 inserts a ParDo with zero output transform into the pipeline.
-func ParDo0(s *Scope, dofn interface{}, col PCollection, opts ...Option) {
+func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 0 {
                panic(fmt.Sprintf("expected 0 output. Found: %v", ret))
@@ -249,7 +249,7 @@ func ParDo0(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) {
 //
 // See 
https://beam.apache.org/documentation/programming-guide/#transforms-pardo";
 // for the web documentation for ParDo
-func ParDo(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
PCollection {
+func ParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) 
PCollection {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 1 {
                panic(fmt.Sprintf("expected 1 output. Found: %v", ret))
@@ -260,7 +260,7 @@ func ParDo(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) PCollect
 // TODO(herohde) 6/1/2017: add windowing aspects to above documentation.
 
 // ParDo2 inserts a ParDo with 2 outputs into the pipeline.
-func ParDo2(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection) {
+func ParDo2(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 2 {
                panic(fmt.Sprintf("expected 2 output. Found: %v", ret))
@@ -269,7 +269,7 @@ func ParDo2(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) (PColle
 }
 
 // ParDo3 inserts a ParDo with 3 outputs into the pipeline.
-func ParDo3(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection) {
+func ParDo3(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 3 {
                panic(fmt.Sprintf("expected 3 output. Found: %v", ret))
@@ -278,7 +278,7 @@ func ParDo3(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) (PColle
 }
 
 // ParDo4 inserts a ParDo with 4 outputs into the pipeline.
-func ParDo4(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection) {
+func ParDo4(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 4 {
                panic(fmt.Sprintf("expected 4 output. Found: %v", ret))
@@ -287,7 +287,7 @@ func ParDo4(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) (PColle
 }
 
 // ParDo5 inserts a ParDo with 5 outputs into the pipeline.
-func ParDo5(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection) {
+func ParDo5(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 5 {
                panic(fmt.Sprintf("expected 5 output. Found: %v", ret))
@@ -296,7 +296,7 @@ func ParDo5(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) (PColle
 }
 
 // ParDo6 inserts a ParDo with 6 outputs into the pipeline.
-func ParDo6(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
+func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 6 {
                panic(fmt.Sprintf("expected 6 output. Found: %v", ret))
@@ -305,7 +305,7 @@ func ParDo6(s *Scope, dofn interface{}, col PCollection, 
opts ...Option) (PColle
 }
 
 // ParDo7 inserts a ParDo with 7 outputs into the pipeline.
-func ParDo7(s *Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, 
PCollection) {
+func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) 
(PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, 
PCollection) {
        ret := MustN(TryParDo(s, dofn, col, opts...))
        if len(ret) != 7 {
                panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
diff --git a/sdks/go/pkg/beam/partition.go b/sdks/go/pkg/beam/partition.go
index 37ab7d0a5bd..9a26d852292 100644
--- a/sdks/go/pkg/beam/partition.go
+++ b/sdks/go/pkg/beam/partition.go
@@ -34,7 +34,7 @@ var (
 // Partition takes a PCollection<T> and a PartitionFn, uses the PartitionFn to
 // split the elements of the input PCollection into N partitions, and returns
 // a []PCollection<T> that bundles N PCollection<T>s containing the split 
elements.
-func Partition(s *Scope, n int, fn interface{}, col PCollection) []PCollection 
{
+func Partition(s Scope, n int, fn interface{}, col PCollection) []PCollection {
        s = s.Scope(fmt.Sprintf("Partition(%v)", n))
 
        if n < 1 {
diff --git a/sdks/go/pkg/beam/pipeline.go b/sdks/go/pkg/beam/pipeline.go
index c911c3fe4c7..33254e18236 100644
--- a/sdks/go/pkg/beam/pipeline.go
+++ b/sdks/go/pkg/beam/pipeline.go
@@ -30,14 +30,26 @@ type Scope struct {
        real *graph.Graph
 }
 
+// IsValid returns true iff the Scope is valid. Any use of an invalid Scope
+// will result in a panic.
+func (s Scope) IsValid() bool {
+       return s.real != nil && s.scope != nil
+}
+
 // Scope returns a sub-scope with the given name. The name provided may
 // be augmented to ensure uniqueness.
-func (s *Scope) Scope(name string) *Scope {
+func (s Scope) Scope(name string) Scope {
+       if !s.IsValid() {
+               panic("Invalid Scope")
+       }
        scope := s.real.NewScope(s.scope, name)
-       return &Scope{scope: scope, real: s.real}
+       return Scope{scope: scope, real: s.real}
 }
 
-func (s *Scope) String() string {
+func (s Scope) String() string {
+       if !s.IsValid() {
+               return "<invalid>"
+       }
        return s.scope.String()
 }
 
@@ -57,8 +69,8 @@ func NewPipeline() *Pipeline {
 }
 
 // Root returns the root scope of the pipeline.
-func (p *Pipeline) Root() *Scope {
-       return &Scope{scope: p.real.Root(), real: p.real}
+func (p *Pipeline) Root() Scope {
+       return Scope{scope: p.real.Root(), real: p.real}
 }
 
 // TODO(herohde) 11/13/2017: consider making Build return the model Pipeline 
proto
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 13208af0ddc..367bdc6e444 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -129,7 +129,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                        }),
                        Version: newMsg(version{
                                JobType: "FNAPI_BATCH",
-                               Major:   "1",
+                               Major:   "6",
                        }),
                        SdkPipelineOptions: newMsg(pipelineOptions{
                                DisplayData: findPipelineFlags(),
diff --git a/sdks/go/pkg/beam/testing/passert/passert.go 
b/sdks/go/pkg/beam/testing/passert/passert.go
index ec546ba003a..38620f5a60a 100644
--- a/sdks/go/pkg/beam/testing/passert/passert.go
+++ b/sdks/go/pkg/beam/testing/passert/passert.go
@@ -40,7 +40,7 @@ func init() {
 // Equals verifies the given collection has the same values as the given
 // values, under coder equality. The values can be provided as single
 // PCollection.
-func Equals(s *beam.Scope, col beam.PCollection, values ...interface{}) 
beam.PCollection {
+func Equals(s beam.Scope, col beam.PCollection, values ...interface{}) 
beam.PCollection {
        if len(values) == 0 {
                return Empty(s, col)
        }
@@ -53,7 +53,7 @@ func Equals(s *beam.Scope, col beam.PCollection, values 
...interface{}) beam.PCo
 }
 
 // equals verifies that the actual values match the expected ones.
-func equals(s *beam.Scope, actual, expected beam.PCollection) beam.PCollection 
{
+func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
        bad, _, bad2 := Diff(s, actual, expected)
        fail(s, bad, "value %v present, but not expected")
        fail(s, bad2, "value %v expected, but not present")
@@ -64,7 +64,7 @@ func equals(s *beam.Scope, actual, expected beam.PCollection) 
beam.PCollection {
 // preserved, so a value may appear multiple times and in multiple 
collections. Coder
 // equality is used to determine equality. Should only be used for small 
collections,
 // because all values are held in memory at the same time.
-func Diff(s *beam.Scope, a, b beam.PCollection) (left, both, right 
beam.PCollection) {
+func Diff(s beam.Scope, a, b beam.PCollection) (left, both, right 
beam.PCollection) {
        imp := beam.Impulse(s)
        return beam.ParDo3(s, &diffFn{Coder: beam.EncodedCoder{Coder: 
a.Coder()}}, imp, beam.SideInput{Input: a}, beam.SideInput{Input: b})
 }
@@ -163,24 +163,24 @@ func encode(c *coder.Coder, value interface{}) (string, 
error) {
 }
 
 // True asserts that all elements satisfy the given predicate.
-func True(s *beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
+func True(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection 
{
        fail(s, filter.Exclude(s, col, fn), "predicate(%v) = false, want true")
        return col
 }
 
 // False asserts that the given predicate does not satisfy any element in the 
condition.
-func False(s *beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
+func False(s beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
        fail(s, filter.Include(s, col, fn), "predicate(%v) = true, want false")
        return col
 }
 
 // Empty asserts that col is empty.
-func Empty(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Empty(s beam.Scope, col beam.PCollection) beam.PCollection {
        fail(s, col, "PCollection contains %v, want empty collection")
        return col
 }
 
-func fail(s *beam.Scope, col beam.PCollection, format string) {
+func fail(s beam.Scope, col beam.PCollection, format string) {
        switch {
        case typex.IsWKV(col.Type()):
                beam.ParDo0(s, &failKVFn{Format: format}, col)
diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go 
b/sdks/go/pkg/beam/testing/ptest/ptest.go
index dd0e386e809..478fcc94812 100644
--- a/sdks/go/pkg/beam/testing/ptest/ptest.go
+++ b/sdks/go/pkg/beam/testing/ptest/ptest.go
@@ -25,28 +25,28 @@ import (
 // TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc.
 
 // Create creates a pipeline and a PCollection with the given values.
-func Create(values []interface{}) (*beam.Pipeline, *beam.Scope, 
beam.PCollection) {
+func Create(values []interface{}) (*beam.Pipeline, beam.Scope, 
beam.PCollection) {
        p := beam.NewPipeline()
        s := p.Root()
        return p, s, beam.Create(s, values...)
 }
 
 // CreateList creates a pipeline and a PCollection with the given values.
-func CreateList(values interface{}) (*beam.Pipeline, *beam.Scope, 
beam.PCollection) {
+func CreateList(values interface{}) (*beam.Pipeline, beam.Scope, 
beam.PCollection) {
        p := beam.NewPipeline()
        s := p.Root()
        return p, s, beam.CreateList(s, values)
 }
 
 // Create2 creates a pipeline and 2 PCollections with the given values.
-func Create2(a, b []interface{}) (*beam.Pipeline, *beam.Scope, 
beam.PCollection, beam.PCollection) {
+func Create2(a, b []interface{}) (*beam.Pipeline, beam.Scope, 
beam.PCollection, beam.PCollection) {
        p := beam.NewPipeline()
        s := p.Root()
        return p, s, beam.Create(s, a...), beam.Create(s, b...)
 }
 
 // CreateList2 creates a pipeline and 2 PCollections with the given values.
-func CreateList2(a, b interface{}) (*beam.Pipeline, *beam.Scope, 
beam.PCollection, beam.PCollection) {
+func CreateList2(a, b interface{}) (*beam.Pipeline, beam.Scope, 
beam.PCollection, beam.PCollection) {
        p := beam.NewPipeline()
        s := p.Root()
        return p, s, beam.CreateList(s, a), beam.CreateList(s, b)
diff --git a/sdks/go/pkg/beam/transforms/filter/distinct.go 
b/sdks/go/pkg/beam/transforms/filter/distinct.go
index 3d7e453c68a..cf28fd84579 100644
--- a/sdks/go/pkg/beam/transforms/filter/distinct.go
+++ b/sdks/go/pkg/beam/transforms/filter/distinct.go
@@ -22,7 +22,7 @@ import (
 // Distinct removes all duplicates from a collection, under coder equality. It
 // expects a PCollection<T> as input and returns a PCollection<T> with
 // duplicates removed.
-func Distinct(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Distinct(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("filter.Distinct")
 
        pre := beam.ParDo(s, mapFn, col)
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go 
b/sdks/go/pkg/beam/transforms/filter/filter.go
index 779d9180007..3a6f5881b35 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -44,7 +44,7 @@ func init() {
 //    })
 //
 // Here, "short" will contain "a" and "b" at runtime.
-func Include(s *beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
+func Include(s beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
        s = s.Scope("filter.Include")
 
        t := typex.SkipW(col.Type()).Type()
@@ -64,7 +64,7 @@ func Include(s *beam.Scope, col beam.PCollection, fn 
interface{}) beam.PCollecti
 //    })
 //
 // Here, "long" will contain "long" and "alsolong" at runtime.
-func Exclude(s *beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
+func Exclude(s beam.Scope, col beam.PCollection, fn interface{}) 
beam.PCollection {
        s = s.Scope("filter.Exclude")
 
        t := typex.SkipW(col.Type()).Type()
diff --git a/sdks/go/pkg/beam/transforms/stats/count.go 
b/sdks/go/pkg/beam/transforms/stats/count.go
index 489aa3f247a..38f3dd26b0e 100644
--- a/sdks/go/pkg/beam/transforms/stats/count.go
+++ b/sdks/go/pkg/beam/transforms/stats/count.go
@@ -22,7 +22,7 @@ import (
 // Count counts the number of elements in a collection. It expects a
 // PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
 // must be a well-defined injection.
-func Count(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.Count")
 
        pre := beam.ParDo(s, mapFn, col)
diff --git a/sdks/go/pkg/beam/transforms/stats/max.go 
b/sdks/go/pkg/beam/transforms/stats/max.go
index 8dcf405f06f..20235696877 100644
--- a/sdks/go/pkg/beam/transforms/stats/max.go
+++ b/sdks/go/pkg/beam/transforms/stats/max.go
@@ -30,7 +30,7 @@ import (
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    max := stats.Max(s, col)   // PCollection<int> with 11 as the only 
element.
 //
-func Max(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Max(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.Max")
        return combine(s, findMaxFn, col)
 }
@@ -38,7 +38,7 @@ func Max(s *beam.Scope, col beam.PCollection) 
beam.PCollection {
 // MaxPerKey returns the maximal element per key in a PCollection<KV<A,B>> as
 // a PCollection<KV<A,B>>. It can only be used for numbers, such as int,
 // uint16, float32, etc.
-func MaxPerKey(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func MaxPerKey(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.MaxPerKey")
        return combinePerKey(s, findMaxFn, col)
 }
diff --git a/sdks/go/pkg/beam/transforms/stats/mean.go 
b/sdks/go/pkg/beam/transforms/stats/mean.go
index ec646eb10dc..9d68231ca1b 100644
--- a/sdks/go/pkg/beam/transforms/stats/mean.go
+++ b/sdks/go/pkg/beam/transforms/stats/mean.go
@@ -31,7 +31,7 @@ import (
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    mean := stats.Mean(s, col)   // PCollection<float64> with 6.8 as the 
only element.
 //
-func Mean(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Mean(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.Mean")
 
        t := beam.ValidateNonCompositeType(col)
@@ -44,7 +44,7 @@ func Mean(s *beam.Scope, col beam.PCollection) 
beam.PCollection {
 // in a collection. It expects a PCollection<KV<A,B>> as input and returns a
 // PCollection<KV<A,float64>>. It can only be used for numbers, such as int,
 // uint16, float32, etc.
-func MeanPerKey(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func MeanPerKey(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.MeanPerKey")
 
        _, t := beam.ValidateKVType(col)
diff --git a/sdks/go/pkg/beam/transforms/stats/min.go 
b/sdks/go/pkg/beam/transforms/stats/min.go
index 227b9bb6e8c..cf6723dc27a 100644
--- a/sdks/go/pkg/beam/transforms/stats/min.go
+++ b/sdks/go/pkg/beam/transforms/stats/min.go
@@ -30,7 +30,7 @@ import (
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    min := stats.Min(s, col)   // PCollection<int> with 1 as the only 
element.
 //
-func Min(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Min(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.Min")
        return combine(s, findMinFn, col)
 }
@@ -38,7 +38,7 @@ func Min(s *beam.Scope, col beam.PCollection) 
beam.PCollection {
 // MinPerKey returns the minimal element per key in a PCollection<KV<A,B>> as
 // a PCollection<KV<A,B>>. It can only be used for numbers, such as int,
 // uint16, float32, etc.
-func MinPerKey(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func MinPerKey(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.MinPerKey")
        return combinePerKey(s, findMinFn, col)
 }
diff --git a/sdks/go/pkg/beam/transforms/stats/sum.go 
b/sdks/go/pkg/beam/transforms/stats/sum.go
index 445a3a423b7..e3036e71691 100644
--- a/sdks/go/pkg/beam/transforms/stats/sum.go
+++ b/sdks/go/pkg/beam/transforms/stats/sum.go
@@ -30,7 +30,7 @@ import (
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    sum := stats.Sum(s, col)   // PCollection<int> with 34 as the only 
element.
 //
-func Sum(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Sum(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.Sum")
        return combine(s, findSumFn, col)
 }
@@ -38,7 +38,7 @@ func Sum(s *beam.Scope, col beam.PCollection) 
beam.PCollection {
 // SumPerKey returns the sum of the values per key in a PCollection<KV<A,B>> as
 // a PCollection<KV<A,B>>. It can only be used for value numbers, such as int,
 // uint16, float32, etc.
-func SumPerKey(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func SumPerKey(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("stats.SumPerKey")
        return combinePerKey(s, findSumFn, col)
 }
diff --git a/sdks/go/pkg/beam/transforms/stats/util.go 
b/sdks/go/pkg/beam/transforms/stats/util.go
index 6c3a7f04f60..970c269f048 100644
--- a/sdks/go/pkg/beam/transforms/stats/util.go
+++ b/sdks/go/pkg/beam/transforms/stats/util.go
@@ -23,7 +23,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
-func combine(s *beam.Scope, makeCombineFn func(reflect.Type) interface{}, col 
beam.PCollection) beam.PCollection {
+func combine(s beam.Scope, makeCombineFn func(reflect.Type) interface{}, col 
beam.PCollection) beam.PCollection {
        t := beam.ValidateNonCompositeType(col)
        validateNonComplexNumber(t.Type())
 
@@ -32,7 +32,7 @@ func combine(s *beam.Scope, makeCombineFn func(reflect.Type) 
interface{}, col be
        return beam.Combine(s, makeCombineFn(t.Type()), col)
 }
 
-func combinePerKey(s *beam.Scope, makeCombineFn func(reflect.Type) 
interface{}, col beam.PCollection) beam.PCollection {
+func combinePerKey(s beam.Scope, makeCombineFn func(reflect.Type) interface{}, 
col beam.PCollection) beam.PCollection {
        _, t := beam.ValidateKVType(col)
        validateNonComplexNumber(t.Type())
 
diff --git a/sdks/go/pkg/beam/transforms/top/top.go 
b/sdks/go/pkg/beam/transforms/top/top.go
index bbc5f353657..630450101ae 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -45,7 +45,7 @@ func init() {
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    top2 := stats.Largest(s, col, 2, less)  // PCollection<[]int> with [11, 
10] as the only element.
 //
-func Largest(s *beam.Scope, col beam.PCollection, n int, less interface{}) 
beam.PCollection {
+func Largest(s beam.Scope, col beam.PCollection, n int, less interface{}) 
beam.PCollection {
        s = s.Scope(fmt.Sprintf("top.Largest(%v)", n))
 
        t := beam.ValidateNonCompositeType(col)
@@ -58,7 +58,7 @@ func Largest(s *beam.Scope, col beam.PCollection, n int, less 
interface{}) beam.
 // The order is defined by the comparator, less : T x T -> bool. It returns a
 // single-element PCollection<KV<K,[]T>> with a slice of the N largest 
elements for
 // each key.
-func LargestPerKey(s *beam.Scope, col beam.PCollection, n int, less 
interface{}) beam.PCollection {
+func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less 
interface{}) beam.PCollection {
        s = s.Scope(fmt.Sprintf("top.LargestPerKey(%v)", n))
 
        _, t := beam.ValidateKVType(col)
@@ -76,7 +76,7 @@ func LargestPerKey(s *beam.Scope, col beam.PCollection, n 
int, less interface{})
 //    col := beam.Create(s, 1, 11, 7, 5, 10)
 //    bottom2 := stats.Smallest(s, col, 2, less)  // PCollection<[]int> with 
[1, 5] as the only element.
 //
-func Smallest(s *beam.Scope, col beam.PCollection, n int, less interface{}) 
beam.PCollection {
+func Smallest(s beam.Scope, col beam.PCollection, n int, less interface{}) 
beam.PCollection {
        s = s.Scope(fmt.Sprintf("top.Smallest(%v)", n))
 
        t := beam.ValidateNonCompositeType(col)
@@ -89,7 +89,7 @@ func Smallest(s *beam.Scope, col beam.PCollection, n int, 
less interface{}) beam
 // The order is defined by the comparator, less : T x T -> bool. It returns a
 // single-element PCollection<KV<K,[]T>> with a slice of the N smallest 
elements for
 // each key.
-func SmallestPerKey(s *beam.Scope, col beam.PCollection, n int, less 
interface{}) beam.PCollection {
+func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less 
interface{}) beam.PCollection {
        s = s.Scope(fmt.Sprintf("top.SmallestPerKey(%v)", n))
 
        _, t := beam.ValidateKVType(col)
diff --git a/sdks/go/pkg/beam/util.go b/sdks/go/pkg/beam/util.go
index ae49640890e..bc8deabb170 100644
--- a/sdks/go/pkg/beam/util.go
+++ b/sdks/go/pkg/beam/util.go
@@ -26,7 +26,7 @@ package beam
 
 // Seq is a convenience helper to chain single-input/single-output ParDos 
together
 // in a sequence.
-func Seq(s *Scope, col PCollection, dofns ...interface{}) PCollection {
+func Seq(s Scope, col PCollection, dofns ...interface{}) PCollection {
        cur := col
        for _, dofn := range dofns {
                cur = ParDo(s, dofn, cur)
@@ -36,7 +36,7 @@ func Seq(s *Scope, col PCollection, dofns ...interface{}) 
PCollection {
 
 // DropKey drops the key for an input PCollection<KV<A,B>>. It returns
 // a PCollection<B>.
-func DropKey(s *Scope, col PCollection) PCollection {
+func DropKey(s Scope, col PCollection) PCollection {
        return ParDo(s, dropKeyFn, col)
 }
 
@@ -46,7 +46,7 @@ func dropKeyFn(_ X, y Y) Y {
 
 // DropValue drops the value for an input PCollection<KV<A,B>>. It returns
 // a PCollection<A>.
-func DropValue(s *Scope, col PCollection) PCollection {
+func DropValue(s Scope, col PCollection) PCollection {
        return ParDo(s, dropValueFn, col)
 }
 
@@ -56,7 +56,7 @@ func dropValueFn(x X, _ Y) X {
 
 // SwapKV swaps the key and value for an input PCollection<KV<A,B>>. It returns
 // a PCollection<KV<B,A>>.
-func SwapKV(s *Scope, col PCollection) PCollection {
+func SwapKV(s Scope, col PCollection) PCollection {
        return ParDo(s, swapKVFn, col)
 }
 
@@ -72,7 +72,7 @@ func swapKVFn(x X, y Y) (Y, X) {
 //    d := top.Top(s, merged, 5, ...)    // PCollection<[]A>
 //    top5 := beam.Explode(s, d)
 //
-func Explode(s *Scope, col PCollection) PCollection {
+func Explode(s Scope, col PCollection) PCollection {
        s = s.Scope("beam.Explode")
        return ParDo(s, explodeFn, col)
 }
diff --git a/sdks/go/pkg/beam/validate.go b/sdks/go/pkg/beam/validate.go
index ad43a0c85fc..f08e5d0a3aa 100644
--- a/sdks/go/pkg/beam/validate.go
+++ b/sdks/go/pkg/beam/validate.go
@@ -44,7 +44,10 @@ func ValidateNonCompositeType(col PCollection) 
typex.FullType {
 
 // validate validates and processes the input collection and options. Private 
convenience
 // function.
-func validate(col PCollection, opts []Option) ([]SideInput, 
map[string]reflect.Type, error) {
+func validate(s Scope, col PCollection, opts []Option) ([]SideInput, 
map[string]reflect.Type, error) {
+       if !s.IsValid() {
+               return nil, nil, fmt.Errorf("invalid scope")
+       }
        if !col.IsValid() {
                return nil, nil, fmt.Errorf("invalid main pcollection")
        }
diff --git a/sdks/go/pkg/beam/x/debug/head.go b/sdks/go/pkg/beam/x/debug/head.go
index 0e045fad45c..91302367a29 100644
--- a/sdks/go/pkg/beam/x/debug/head.go
+++ b/sdks/go/pkg/beam/x/debug/head.go
@@ -29,7 +29,7 @@ func init() {
 
 // Head returns the first "n" elements it sees, it doesn't enforce any logic
 // as to what elements they will be.
-func Head(s *beam.Scope, col beam.PCollection, n int) beam.PCollection {
+func Head(s beam.Scope, col beam.PCollection, n int) beam.PCollection {
        s = s.Scope("debug.Head")
 
        switch {
diff --git a/sdks/go/pkg/beam/x/debug/print.go 
b/sdks/go/pkg/beam/x/debug/print.go
index 2c8d3ae8c61..f8ef4f76ea1 100644
--- a/sdks/go/pkg/beam/x/debug/print.go
+++ b/sdks/go/pkg/beam/x/debug/print.go
@@ -32,13 +32,13 @@ func init() {
 }
 
 // Print prints out all data. Use with care.
-func Print(s *beam.Scope, col beam.PCollection) beam.PCollection {
+func Print(s beam.Scope, col beam.PCollection) beam.PCollection {
        return Printf(s, "Elm: %v", col)
 }
 
 // Printf prints out all data with custom formatting. The given format string
 // is used as log.Printf(format, elm) for each element. Use with care.
-func Printf(s *beam.Scope, format string, col beam.PCollection) 
beam.PCollection {
+func Printf(s beam.Scope, format string, col beam.PCollection) 
beam.PCollection {
        s = s.Scope("debug.Print")
 
        switch {
@@ -84,7 +84,7 @@ func (f *printGBKFn) ProcessElement(ctx context.Context, x 
beam.X, iter func(*be
 }
 
 // Discard is a sink that discards all data.
-func Discard(s *beam.Scope, col beam.PCollection) {
+func Discard(s beam.Scope, col beam.PCollection) {
        s = s.Scope("debug.Discard")
        beam.ParDo0(s, discardFn, col)
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Develop a Go SDK for Beam
> -------------------------
>
>                 Key: BEAM-2083
>                 URL: https://issues.apache.org/jira/browse/BEAM-2083
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Bill Neubauer
>            Assignee: Henning Rohde
>
> Allow users of the Go programming language (https://golang.org/) to write 
> Beam pipelines in this language. The effort is focusing on full-fledged SDK 
> that leverages the Beam Fn API to bootstrap a native Go experience.
> Initial design:
>         https://s.apache.org/beam-go-sdk-design-rfc
> Development in the go-sdk branch. Work in progress. YMMV.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to