[ 
https://issues.apache.org/jira/browse/BEAM-4813?focusedWorklogId=128811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128811
 ]

ASF GitHub Bot logged work on BEAM-4813:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jul/18 17:26
            Start Date: 30/Jul/18 17:26
    Worklog Time Spent: 10m 
      Work Description: herohde closed pull request #5994:  [BEAM-4813] 
Refactor Go Dataflow runner and translation
URL: https://github.com/apache/beam/pull/5994
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index f9168212aa1..6fc2b363ae3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -159,10 +159,12 @@ func (p *Plan) String() string {
 
 // Metrics returns a snapshot of input progress of the plan, and associated 
metrics.
 func (p *Plan) Metrics() *fnpb.Metrics {
-       snapshot := p.source.Progress()
+       transforms := make(map[string]*fnpb.Metrics_PTransform)
 
-       transforms := map[string]*fnpb.Metrics_PTransform{
-               snapshot.ID: &fnpb.Metrics_PTransform{
+       if p.source != nil {
+               snapshot := p.source.Progress()
+
+               transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
                        ProcessedElements: 
&fnpb.Metrics_PTransform_ProcessedElements{
                                Measured: &fnpb.Metrics_PTransform_Measured{
                                        OutputElementCounts: map[string]int64{
@@ -170,7 +172,7 @@ func (p *Plan) Metrics() *fnpb.Metrics {
                                        },
                                },
                        },
-               },
+               }
        }
 
        for _, pt := range p.parDoIds {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 67dcf488455..ab8f306b873 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -452,6 +452,9 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                }
                u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}
 
+       case graphx.URNFlatten:
+               u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: 
out[0]}
+
        case urnDataSink:
                port, cid, err := unmarshalPort(payload)
                if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go 
b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
new file mode 100644
index 00000000000..52d5f6a9ea2
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+       "fmt"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+)
+
+// TODO(herohde) 7/17/2018: move CoderRef to dataflowlib once Dataflow
+// sends back pure coder protos in the process bundle descriptors.
+
+// CoderRef defines the (structured) Coder in serializable form. It is
+// an artifact of the CloudObject encoding.
+type CoderRef struct {
+       Type         string      `json:"@type,omitempty"`
+       Components   []*CoderRef `json:"component_encodings,omitempty"`
+       IsWrapper    bool        `json:"is_wrapper,omitempty"`
+       IsPairLike   bool        `json:"is_pair_like,omitempty"`
+       IsStreamLike bool        `json:"is_stream_like,omitempty"`
+}
+
+// Exported types are used for translation lookup.
+const (
+       windowedValueType = "kind:windowed_value"
+       bytesType         = "kind:bytes"
+       varIntType        = "kind:varint"
+       streamType        = "kind:stream"
+       pairType          = "kind:pair"
+       lengthPrefixType  = "kind:length_prefix"
+
+       globalWindowType   = "kind:global_window"
+       intervalWindowType = "kind:interval_window"
+
+       cogbklistType = "kind:cogbklist" // CoGBK representation. Not a coder.
+)
+
+// WrapIterable adds an iterable (stream) coder for Dataflow side input.
+func WrapIterable(c *CoderRef) *CoderRef {
+       return &CoderRef{Type: streamType, Components: []*CoderRef{c}, 
IsStreamLike: true}
+}
+
+// WrapWindowed adds a windowed coder for Dataflow collections.
+func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) *CoderRef {
+       w, err := encodeWindowCoder(wc)
+       if err != nil {
+               panic(err)
+       }
+       return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, 
w}, IsWrapper: true}
+}
+
+// EncodeCoderRefs returns the encoded forms understood by the runner.
+func EncodeCoderRefs(list []*coder.Coder) ([]*CoderRef, error) {
+       var refs []*CoderRef
+       for _, c := range list {
+               ref, err := EncodeCoderRef(c)
+               if err != nil {
+                       return nil, err
+               }
+               refs = append(refs, ref)
+       }
+       return refs, nil
+}
+
+// EncodeCoderRef returns the encoded form understood by the runner.
+func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
+       switch c.Kind {
+       case coder.Custom:
+               ref, err := encodeCustomCoder(c.Custom)
+               if err != nil {
+                       return nil, err
+               }
+               data, err := protox.EncodeBase64(ref)
+               if err != nil {
+                       return nil, err
+               }
+               return &CoderRef{Type: lengthPrefixType, Components: 
[]*CoderRef{{Type: data}}}, nil
+
+       case coder.KV:
+               if len(c.Components) != 2 {
+                       return nil, fmt.Errorf("bad KV: %v", c)
+               }
+
+               key, err := EncodeCoderRef(c.Components[0])
+               if err != nil {
+                       return nil, err
+               }
+               value, err := EncodeCoderRef(c.Components[1])
+               if err != nil {
+                       return nil, err
+               }
+               return &CoderRef{Type: pairType, Components: []*CoderRef{key, 
value}, IsPairLike: true}, nil
+
+       case coder.CoGBK:
+               if len(c.Components) < 2 {
+                       return nil, fmt.Errorf("bad CoGBK: %v", c)
+               }
+
+               refs, err := EncodeCoderRefs(c.Components)
+               if err != nil {
+                       return nil, err
+               }
+
+               value := refs[1]
+               if len(c.Components) > 2 {
+                       // TODO(BEAM-490): don't inject union coder for CoGBK.
+
+                       union := &CoderRef{Type: cogbklistType, Components: 
refs[1:]}
+                       value = &CoderRef{Type: lengthPrefixType, Components: 
[]*CoderRef{union}}
+               }
+
+               stream := &CoderRef{Type: streamType, Components: 
[]*CoderRef{value}, IsStreamLike: true}
+               return &CoderRef{Type: pairType, Components: 
[]*CoderRef{refs[0], stream}, IsPairLike: true}, nil
+
+       case coder.WindowedValue:
+               if len(c.Components) != 1 || c.Window == nil {
+                       return nil, fmt.Errorf("bad windowed value: %v", c)
+               }
+
+               elm, err := EncodeCoderRef(c.Components[0])
+               if err != nil {
+                       return nil, err
+               }
+               w, err := encodeWindowCoder(c.Window)
+               if err != nil {
+                       return nil, err
+               }
+               return &CoderRef{Type: windowedValueType, Components: 
[]*CoderRef{elm, w}, IsWrapper: true}, nil
+
+       case coder.Bytes:
+               // TODO(herohde) 6/27/2017: add length-prefix and not assume 
nested by context?
+               return &CoderRef{Type: bytesType}, nil
+
+       case coder.VarInt:
+               return &CoderRef{Type: varIntType}, nil
+
+       default:
+               return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
+       }
+}
+
+// DecodeCoderRefs extracts usable coders from the encoded runner form.
+func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error) {
+       var ret []*coder.Coder
+       for _, ref := range list {
+               c, err := DecodeCoderRef(ref)
+               if err != nil {
+                       return nil, err
+               }
+               ret = append(ret, c)
+       }
+       return ret, nil
+}
+
+// DecodeCoderRef extracts a usable coder from the encoded runner form.
+func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
+       switch c.Type {
+       case bytesType:
+               return coder.NewBytes(), nil
+
+       case varIntType:
+               return coder.NewVarInt(), nil
+
+       case pairType:
+               if len(c.Components) != 2 {
+                       return nil, fmt.Errorf("bad pair: %+v", c)
+               }
+
+               key, err := DecodeCoderRef(c.Components[0])
+               if err != nil {
+                       return nil, err
+               }
+
+               elm := c.Components[1]
+               kind := coder.KV
+               root := typex.KVType
+
+               isGBK := elm.Type == streamType
+               if isGBK {
+                       elm = elm.Components[0]
+                       kind = coder.CoGBK
+                       root = typex.CoGBKType
+
+                       // TODO(BEAM-490): If CoGBK with > 1 input, handle as 
special GBK. We expect
+                       // it to be encoded as CoGBK<K,LP<Union<V,W,..>>. 
Remove this handling once
+                       // CoGBK has a first-class representation.
+
+                       if refs, ok := isCoGBKList(elm); ok {
+                               values, err := DecodeCoderRefs(refs)
+                               if err != nil {
+                                       return nil, err
+                               }
+
+                               t := typex.New(root, 
append([]typex.FullType{key.T}, coder.Types(values)...)...)
+                               return &coder.Coder{Kind: kind, T: t, 
Components: append([]*coder.Coder{key}, values...)}, nil
+                       }
+               }
+
+               value, err := DecodeCoderRef(elm)
+               if err != nil {
+                       return nil, err
+               }
+
+               t := typex.New(root, key.T, value.T)
+               return &coder.Coder{Kind: kind, T: t, Components: 
[]*coder.Coder{key, value}}, nil
+
+       case lengthPrefixType:
+               if len(c.Components) != 1 {
+                       return nil, fmt.Errorf("bad length prefix: %+v", c)
+               }
+
+               var ref v1.CustomCoder
+               if err := protox.DecodeBase64(c.Components[0].Type, &ref); err 
!= nil {
+                       return nil, fmt.Errorf("base64 decode for %v failed: 
%v", c.Components[0].Type, err)
+               }
+               custom, err := decodeCustomCoder(&ref)
+               if err != nil {
+                       return nil, err
+               }
+               t := typex.New(custom.Type)
+               return &coder.Coder{Kind: coder.Custom, T: t, Custom: custom}, 
nil
+
+       case windowedValueType:
+               if len(c.Components) != 2 {
+                       return nil, fmt.Errorf("bad windowed value: %+v", c)
+               }
+
+               elm, err := DecodeCoderRef(c.Components[0])
+               if err != nil {
+                       return nil, err
+               }
+               w, err := decodeWindowCoder(c.Components[1])
+               if err != nil {
+                       return nil, err
+               }
+               t := typex.New(typex.WindowedValueType, elm.T)
+
+               return &coder.Coder{Kind: coder.WindowedValue, T: t, 
Components: []*coder.Coder{elm}, Window: w}, nil
+
+       case streamType:
+               return nil, fmt.Errorf("stream must be pair value: %+v", c)
+
+       default:
+               return nil, fmt.Errorf("custom coders must be length prefixed: 
%+v", c)
+       }
+}
+
+func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) {
+       if ref.Type != lengthPrefixType {
+               return nil, false
+       }
+       ref2 := ref.Components[0]
+       if ref2.Type != cogbklistType {
+               return nil, false
+       }
+       return ref2.Components, true
+}
+
+// encodeWindowCoder translates the preprocessed representation of a Beam coder
+// into the wire representation, capturing the underlying types used by
+// the coder.
+func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) {
+       switch w.Kind {
+       case coder.GlobalWindow:
+               return &CoderRef{Type: globalWindowType}, nil
+       case coder.IntervalWindow:
+               return &CoderRef{Type: intervalWindowType}, nil
+       default:
+               return nil, fmt.Errorf("bad window kind: %v", w.Kind)
+       }
+}
+
+// decodeWindowCoder receives the wire representation of a Beam coder, 
extracting
+// the preprocessed representation, expanding all types used by the coder.
+func decodeWindowCoder(w *CoderRef) (*coder.WindowCoder, error) {
+       switch w.Type {
+       case globalWindowType:
+               return coder.NewGlobalWindow(), nil
+       case intervalWindowType:
+               return coder.NewIntervalWindow(), nil
+       default:
+               return nil, fmt.Errorf("bad window: %v", w.Type)
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 1a0ef4723d4..6e2ac905404 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -29,7 +29,6 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
@@ -743,268 +742,3 @@ func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) 
(graph.InputKind, error)
                return graph.Main, fmt.Errorf("invalid input kind: %v", k)
        }
 }
-
-// CoderRef defines the (structured) Coder in serializable form. It is
-// an artifact of the CloudObject encoding.
-type CoderRef struct {
-       Type         string      `json:"@type,omitempty"`
-       Components   []*CoderRef `json:"component_encodings,omitempty"`
-       IsWrapper    bool        `json:"is_wrapper,omitempty"`
-       IsPairLike   bool        `json:"is_pair_like,omitempty"`
-       IsStreamLike bool        `json:"is_stream_like,omitempty"`
-}
-
-// Exported types are used for translation lookup.
-const (
-       WindowedValueType = "kind:windowed_value"
-       BytesType         = "kind:bytes"
-       VarIntType        = "kind:varint"
-       streamType        = "kind:stream"
-       pairType          = "kind:pair"
-       lengthPrefixType  = "kind:length_prefix"
-
-       GlobalWindowType   = "kind:global_window"
-       IntervalWindowType = "kind:interval_window"
-
-       cogbklistType = "kind:cogbklist" // CoGBK representation. Not a coder.
-)
-
-// WrapExtraWindowedValue adds an additional WV needed for side input, which
-// expects the coder to have exactly one component with the element.
-func WrapExtraWindowedValue(c *CoderRef) *CoderRef {
-       return &CoderRef{Type: WindowedValueType, Components: []*CoderRef{c, 
c.Components[1]}}
-}
-
-// EncodeCoderRefs returns the encoded forms understood by the runner.
-func EncodeCoderRefs(list []*coder.Coder) ([]*CoderRef, error) {
-       var refs []*CoderRef
-       for _, c := range list {
-               ref, err := EncodeCoderRef(c)
-               if err != nil {
-                       return nil, err
-               }
-               refs = append(refs, ref)
-       }
-       return refs, nil
-}
-
-// EncodeCoderRef returns the encoded form understood by the runner.
-func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
-       switch c.Kind {
-       case coder.Custom:
-               ref, err := encodeCustomCoder(c.Custom)
-               if err != nil {
-                       return nil, err
-               }
-               data, err := protox.EncodeBase64(ref)
-               if err != nil {
-                       return nil, err
-               }
-               return &CoderRef{Type: lengthPrefixType, Components: 
[]*CoderRef{{Type: data}}}, nil
-
-       case coder.KV:
-               if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad KV: %v", c)
-               }
-
-               key, err := EncodeCoderRef(c.Components[0])
-               if err != nil {
-                       return nil, err
-               }
-               value, err := EncodeCoderRef(c.Components[1])
-               if err != nil {
-                       return nil, err
-               }
-               return &CoderRef{Type: pairType, Components: []*CoderRef{key, 
value}, IsPairLike: true}, nil
-
-       case coder.CoGBK:
-               if len(c.Components) < 2 {
-                       return nil, fmt.Errorf("bad CoGBK: %v", c)
-               }
-
-               refs, err := EncodeCoderRefs(c.Components)
-               if err != nil {
-                       return nil, err
-               }
-
-               value := refs[1]
-               if len(c.Components) > 2 {
-                       // TODO(BEAM-490): don't inject union coder for CoGBK.
-
-                       union := &CoderRef{Type: cogbklistType, Components: 
refs[1:]}
-                       value = &CoderRef{Type: lengthPrefixType, Components: 
[]*CoderRef{union}}
-               }
-
-               stream := &CoderRef{Type: streamType, Components: 
[]*CoderRef{value}, IsStreamLike: true}
-               return &CoderRef{Type: pairType, Components: 
[]*CoderRef{refs[0], stream}, IsPairLike: true}, nil
-
-       case coder.WindowedValue:
-               if len(c.Components) != 1 || c.Window == nil {
-                       return nil, fmt.Errorf("bad windowed value: %v", c)
-               }
-
-               elm, err := EncodeCoderRef(c.Components[0])
-               if err != nil {
-                       return nil, err
-               }
-               w, err := encodeWindowCoder(c.Window)
-               if err != nil {
-                       return nil, err
-               }
-               return &CoderRef{Type: WindowedValueType, Components: 
[]*CoderRef{elm, w}, IsWrapper: true}, nil
-
-       case coder.Bytes:
-               // TODO(herohde) 6/27/2017: add length-prefix and not assume 
nested by context?
-               return &CoderRef{Type: BytesType}, nil
-
-       case coder.VarInt:
-               return &CoderRef{Type: VarIntType}, nil
-
-       default:
-               return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
-       }
-}
-
-// DecodeCoderRefs extracts usable coders from the encoded runner form.
-func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error) {
-       var ret []*coder.Coder
-       for _, ref := range list {
-               c, err := DecodeCoderRef(ref)
-               if err != nil {
-                       return nil, err
-               }
-               ret = append(ret, c)
-       }
-       return ret, nil
-}
-
-// DecodeCoderRef extracts a usable coder from the encoded runner form.
-func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
-       switch c.Type {
-       case BytesType:
-               return coder.NewBytes(), nil
-
-       case VarIntType:
-               return coder.NewVarInt(), nil
-
-       case pairType:
-               if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad pair: %+v", c)
-               }
-
-               key, err := DecodeCoderRef(c.Components[0])
-               if err != nil {
-                       return nil, err
-               }
-
-               elm := c.Components[1]
-               kind := coder.KV
-               root := typex.KVType
-
-               isGBK := elm.Type == streamType
-               if isGBK {
-                       elm = elm.Components[0]
-                       kind = coder.CoGBK
-                       root = typex.CoGBKType
-
-                       // TODO(BEAM-490): If CoGBK with > 1 input, handle as 
special GBK. We expect
-                       // it to be encoded as CoGBK<K,LP<Union<V,W,..>>. 
Remove this handling once
-                       // CoGBK has a first-class representation.
-
-                       if refs, ok := isCoGBKList(elm); ok {
-                               values, err := DecodeCoderRefs(refs)
-                               if err != nil {
-                                       return nil, err
-                               }
-
-                               t := typex.New(root, 
append([]typex.FullType{key.T}, coder.Types(values)...)...)
-                               return &coder.Coder{Kind: kind, T: t, 
Components: append([]*coder.Coder{key}, values...)}, nil
-                       }
-               }
-
-               value, err := DecodeCoderRef(elm)
-               if err != nil {
-                       return nil, err
-               }
-
-               t := typex.New(root, key.T, value.T)
-               return &coder.Coder{Kind: kind, T: t, Components: 
[]*coder.Coder{key, value}}, nil
-
-       case lengthPrefixType:
-               if len(c.Components) != 1 {
-                       return nil, fmt.Errorf("bad length prefix: %+v", c)
-               }
-
-               var ref v1.CustomCoder
-               if err := protox.DecodeBase64(c.Components[0].Type, &ref); err 
!= nil {
-                       return nil, fmt.Errorf("base64 decode for %v failed: 
%v", c.Components[0].Type, err)
-               }
-               custom, err := decodeCustomCoder(&ref)
-               if err != nil {
-                       return nil, err
-               }
-               t := typex.New(custom.Type)
-               return &coder.Coder{Kind: coder.Custom, T: t, Custom: custom}, 
nil
-
-       case WindowedValueType:
-               if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad windowed value: %+v", c)
-               }
-
-               elm, err := DecodeCoderRef(c.Components[0])
-               if err != nil {
-                       return nil, err
-               }
-               w, err := decodeWindowCoder(c.Components[1])
-               if err != nil {
-                       return nil, err
-               }
-               t := typex.New(typex.WindowedValueType, elm.T)
-
-               return &coder.Coder{Kind: coder.WindowedValue, T: t, 
Components: []*coder.Coder{elm}, Window: w}, nil
-
-       case streamType:
-               return nil, fmt.Errorf("stream must be pair value: %+v", c)
-
-       default:
-               return nil, fmt.Errorf("custom coders must be length prefixed: 
%+v", c)
-       }
-}
-
-func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) {
-       if ref.Type != lengthPrefixType {
-               return nil, false
-       }
-       ref2 := ref.Components[0]
-       if ref2.Type != cogbklistType {
-               return nil, false
-       }
-       return ref2.Components, true
-}
-
-// encodeWindowCoder translates the preprocessed representation of a Beam coder
-// into the wire representation, capturing the underlying types used by
-// the coder.
-func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) {
-       switch w.Kind {
-       case coder.GlobalWindow:
-               return &CoderRef{Type: GlobalWindowType}, nil
-       case coder.IntervalWindow:
-               return &CoderRef{Type: IntervalWindowType}, nil
-       default:
-               return nil, fmt.Errorf("bad window kind: %v", w.Kind)
-       }
-}
-
-// decodeWindowCoder receives the wire representation of a Beam coder, 
extracting
-// the preprocessed representation, expanding all types used by the coder.
-func decodeWindowCoder(w *CoderRef) (*coder.WindowCoder, error) {
-       switch w.Type {
-       case GlobalWindowType:
-               return coder.NewGlobalWindow(), nil
-       case IntervalWindowType:
-               return coder.NewIntervalWindow(), nil
-       default:
-               return nil, fmt.Errorf("bad window: %v", w.Type)
-       }
-}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index a59e53187df..d07e0b76d96 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -22,6 +22,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
@@ -38,6 +39,9 @@ const (
        URNCombinePerKey = "beam:transform:combine_per_key:v1"
        URNWindow        = "beam:transform:window:v1"
 
+       URNIterableSideInput = "beam:side_input:iterable:v1"
+       URNMultimapSideInput = "beam:side_input:multimap:v1"
+
        URNGlobalWindowsWindowFn  = "beam:windowfn:global_windows:v0.1"
        URNFixedWindowsWindowFn   = "beam:windowfn:fixed_windows:v0.1"
        URNSlidingWindowsWindowFn = "beam:windowfn:sliding_windows:v0.1"
@@ -63,23 +67,19 @@ type Options struct {
 // Marshal converts a graph to a model pipeline.
 func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) {
        tree := NewScopeTree(edges)
-       EnsureUniqueNames(tree)
 
        m := newMarshaller(opt)
-
-       var roots []string
        for _, edge := range tree.Edges {
-               roots = append(roots, m.addMultiEdge("", edge))
+               m.addMultiEdge(edge)
        }
        for _, t := range tree.Children {
-               roots = append(roots, m.addScopeTree("", t))
+               m.addScopeTree(t)
        }
 
        p := &pb.Pipeline{
-               Components:       m.build(),
-               RootTransformIds: roots,
+               Components: m.build(),
        }
-       return p, nil
+       return pipelinex.Normalize(p)
 }
 
 type marshaller struct {
@@ -117,38 +117,23 @@ func (m *marshaller) build() *pb.Components {
        }
 }
 
-func (m *marshaller) addScopeTree(trunk string, s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) string {
        id := scopeID(s.Scope.Scope)
        if _, exists := m.transforms[id]; exists {
                return id
        }
 
-       uniqueName := fmt.Sprintf("%v/%v", trunk, s.Scope.Name)
-
        var subtransforms []string
        for _, edge := range s.Edges {
-               subtransforms = append(subtransforms, 
m.addMultiEdge(uniqueName, edge))
+               subtransforms = append(subtransforms, m.addMultiEdge(edge))
        }
        for _, tree := range s.Children {
-               subtransforms = append(subtransforms, 
m.addScopeTree(uniqueName, tree))
-       }
-
-       // Compute the input/output for this scope:
-       //    inputs  := U(subinputs)\U(suboutputs)
-       //    outputs := U(suboutputs)\U(subinputs)
-       // where U is set union and \ is set subtraction.
-
-       in := make(map[string]bool)
-       out := make(map[string]bool)
-       for _, sid := range subtransforms {
-               inout(m.transforms[sid], in, out)
+               subtransforms = append(subtransforms, m.addScopeTree(tree))
        }
 
        transform := &pb.PTransform{
-               UniqueName:    uniqueName,
+               UniqueName:    s.Scope.Name,
                Subtransforms: subtransforms,
-               Inputs:        diff(in, out),
-               Outputs:       diff(out, in),
        }
 
        m.updateIfCombineComposite(s, transform)
@@ -203,29 +188,8 @@ func tryAddingCoder(c *coder.Coder) (ok bool) {
        return true
 }
 
-// diff computes A\B and returns its keys as an identity map.
-func diff(a, b map[string]bool) map[string]string {
-       ret := make(map[string]string)
-       for key := range a {
-               if !b[key] {
-                       ret[key] = key
-               }
-       }
-       return ret
-}
-
-// inout adds the input and output pcollection ids to the accumulators.
-func inout(transform *pb.PTransform, in, out map[string]bool) {
-       for _, col := range transform.GetInputs() {
-               in[col] = true
-       }
-       for _, col := range transform.GetOutputs() {
-               out[col] = true
-       }
-}
-
-func (m *marshaller) addMultiEdge(trunk string, edge NamedEdge) string {
-       id := StableMultiEdgeID(edge.Edge)
+func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+       id := edgeID(edge.Edge)
        if _, exists := m.transforms[id]; exists {
                return id
        }
@@ -246,7 +210,7 @@ func (m *marshaller) addMultiEdge(trunk string, edge 
NamedEdge) string {
        }
 
        transform := &pb.PTransform{
-               UniqueName: fmt.Sprintf("%v/%v", trunk, edge.Name),
+               UniqueName: edge.Name,
                Spec:       m.makePayload(edge.Edge),
                Inputs:     inputs,
                Outputs:    outputs,
@@ -260,13 +224,12 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        // TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have 
to translate
        // CoGBK with multiple PCollections as described in cogbk.go.
 
-       // TODO(herohde) 1/26/2018: we should make the expanded GBK a composite 
if we care
-       // about correctly computing input/output in the enclosing Scope.
-
-       id := StableMultiEdgeID(edge.Edge)
+       id := edgeID(edge.Edge)
        kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
        gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
 
+       var subtransforms []string
+
        inputs := make(map[string]string)
        for i, in := range edge.Edge.Input {
                m.addNode(in.From)
@@ -276,15 +239,15 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 
                // Inject(i)
 
-               injectID := StableCoGBKInjectID(id, i)
+               injectID := fmt.Sprintf("%v_inject%v", id, i)
                payload := &pb.ParDoPayload{
                        DoFn: &pb.SdkFunctionSpec{
                                Spec: &pb.FunctionSpec{
                                        Urn: URNInject,
-                                       Payload: 
protox.MustEncode(&v1.TransformPayload{
+                                       Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                                Urn:    URNInject,
                                                Inject: &v1.InjectPayload{N: 
(int32)(i)},
-                                       }),
+                                       })),
                                },
                                EnvironmentId: m.addDefaultEnv(),
                        },
@@ -299,6 +262,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                        Outputs: map[string]string{"i0": out},
                }
                m.transforms[injectID] = inject
+               subtransforms = append(subtransforms, injectID)
 
                inputs[fmt.Sprintf("i%v", i)] = out
        }
@@ -310,7 +274,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        out := fmt.Sprintf("%v_flatten", nodeID(outNode))
        m.makeNode(out, kvCoderID, outNode)
 
-       flattenID := StableCoGBKFlattenID(id)
+       flattenID := fmt.Sprintf("%v_flatten", id)
        flatten := &pb.PTransform{
                UniqueName: flattenID,
                Spec:       &pb.FunctionSpec{Urn: URNFlatten},
@@ -318,44 +282,98 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                Outputs:    map[string]string{"i0": out},
        }
        m.transforms[flattenID] = flatten
+       subtransforms = append(subtransforms, flattenID)
 
        // CoGBK
 
        gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
        m.makeNode(gbkOut, gbkCoderID, outNode)
 
-       gbkID := StableCoGBKGBKID(id)
+       gbkID := fmt.Sprintf("%v_gbk", id)
        gbk := &pb.PTransform{
-               UniqueName: edge.Name,
+               UniqueName: gbkID,
                Spec:       m.makePayload(edge.Edge),
                Inputs:     map[string]string{"i0": out},
                Outputs:    map[string]string{"i0": gbkOut},
        }
        m.transforms[gbkID] = gbk
+       subtransforms = append(subtransforms, gbkID)
 
        // Expand
 
        m.addNode(outNode)
 
+       expandID := fmt.Sprintf("%v_expand", id)
+       payload := &pb.ParDoPayload{
+               DoFn: &pb.SdkFunctionSpec{
+                       Spec: &pb.FunctionSpec{
+                               Urn: URNExpand,
+                               Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
+                                       Urn: URNExpand,
+                               })),
+                       },
+                       EnvironmentId: m.addDefaultEnv(),
+               },
+       }
        expand := &pb.PTransform{
-               UniqueName: id,
+               UniqueName: expandID,
                Spec: &pb.FunctionSpec{
-                       Urn:     URNExpand,
-                       Payload: protox.MustEncode(&v1.TransformPayload{Urn: 
URNExpand}),
+                       Urn:     URNParDo,
+                       Payload: protox.MustEncode(payload),
                },
-               Inputs:  map[string]string{"i0": out},
+               Inputs:  map[string]string{"i0": gbkOut},
                Outputs: map[string]string{"i0": nodeID(outNode)},
        }
        m.transforms[id] = expand
+       subtransforms = append(subtransforms, id)
+
+       // Add composite for visualization
+
+       cogbkID := fmt.Sprintf("%v_cogbk", id)
+       m.transforms[cogbkID] = &pb.PTransform{
+               UniqueName:    edge.Name,
+               Subtransforms: subtransforms,
+       }
        return id
 }
 
 func (m *marshaller) makePayload(edge *graph.MultiEdge) *pb.FunctionSpec {
        switch edge.Op {
        case graph.Impulse:
+               // TODO(herohde) 7/18/2018: Encode data?
                return &pb.FunctionSpec{Urn: URNImpulse}
 
-       case graph.ParDo, graph.Combine:
+       case graph.ParDo:
+               si := make(map[string]*pb.SideInput)
+               for i, in := range edge.Input {
+                       switch in.Kind {
+                       case graph.Main:
+                               // ignore: not a side input
+                       case graph.Singleton, graph.Slice, graph.Iter, 
graph.ReIter:
+                               si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
+                                       AccessPattern: &pb.FunctionSpec{Urn: 
URNIterableSideInput},
+                                       // TODO(herohde) 7/16/2018: side input 
data
+                               }
+                       case graph.Map, graph.MultiMap:
+                               panic("NYI")
+                       default:
+                               panic(fmt.Sprintf("unexpected input kind: %v", 
edge))
+                       }
+               }
+
+               payload := &pb.ParDoPayload{
+                       DoFn: &pb.SdkFunctionSpec{
+                               Spec: &pb.FunctionSpec{
+                                       Urn:     URNJavaDoFn,
+                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge)),
+                               },
+                               EnvironmentId: m.addDefaultEnv(),
+                       },
+                       SideInputs: si,
+               }
+               return &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
+
+       case graph.Combine:
                payload := &pb.ParDoPayload{
                        DoFn: &pb.SdkFunctionSpec{
                                Spec: &pb.FunctionSpec{
@@ -426,7 +444,7 @@ func (m *marshaller) addDefaultEnv() string {
 }
 
 func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
-       ws := MarshalWindowingStrategy(m.coders, w)
+       ws := marshalWindowingStrategy(m.coders, w)
        return m.internWindowingStrategy(ws)
 }
 
@@ -442,12 +460,9 @@ func (m *marshaller) internWindowingStrategy(w 
*pb.WindowingStrategy) string {
        return id
 }
 
-// TODO(herohde) 4/14/2018: make below function private or refactor,
-// once Dataflow doesn't need it anymore.
-
-// MarshalWindowingStrategy marshals the given windowing strategy in
+// marshalWindowingStrategy marshals the given windowing strategy in
 // the given coder context.
-func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
*pb.WindowingStrategy {
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
*pb.WindowingStrategy {
        ws := &pb.WindowingStrategy{
                WindowFn: &pb.SdkFunctionSpec{
                        Spec: makeWindowFn(w.Fn),
@@ -529,6 +544,10 @@ func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) 
string {
        })
 }
 
+func edgeID(edge *graph.MultiEdge) string {
+       return fmt.Sprintf("e%v", edge.ID())
+}
+
 func nodeID(n *graph.Node) string {
        return fmt.Sprintf("n%v", n.ID())
 }
@@ -536,24 +555,3 @@ func nodeID(n *graph.Node) string {
 func scopeID(s *graph.Scope) string {
        return fmt.Sprintf("s%v", s.ID())
 }
-
-// TODO(herohde) 4/17/2018: StableXXXID returns deterministic transform ids
-// for reference in the Dataflow runner. A better solution is to translate
-// the proto pipeline to the Dataflow representation (or for Dataflow to
-// support proto pipelines directly).
-
-func StableMultiEdgeID(edge *graph.MultiEdge) string {
-       return fmt.Sprintf("e%v", edge.ID())
-}
-
-func StableCoGBKInjectID(id string, i int) string {
-       return fmt.Sprintf("%v_inject%v", id, i)
-}
-
-func StableCoGBKFlattenID(id string) string {
-       return fmt.Sprintf("%v_flatten", id)
-}
-
-func StableCoGBKGBKID(id string) string {
-       return fmt.Sprintf("%v_gbk", id)
-}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/tree.go 
b/sdks/go/pkg/beam/core/runtime/graphx/tree.go
index 3e3b925dc25..c48fd8cf9c8 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/tree.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/tree.go
@@ -16,8 +16,6 @@
 package graphx
 
 import (
-       "fmt"
-
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 )
 
@@ -90,32 +88,3 @@ func (t *treeBuilder) addScope(s *graph.Scope) *ScopeTree {
        }
        return tree
 }
-
-// EnsureUniqueNames ensures that each name is unique within each ScopeTree
-// recursively. Any conflict is resolved by adding '1, '2, etc to the name.
-func EnsureUniqueNames(tree *ScopeTree) {
-       seen := make(map[string]bool)
-       for _, edge := range tree.Edges {
-               edge.Name = findFreeName(seen, edge.Name)
-               seen[edge.Name] = true
-       }
-
-       for _, s := range tree.Children {
-               EnsureUniqueNames(s)
-
-               s.Scope.Name = findFreeName(seen, s.Scope.Name)
-               seen[s.Scope.Name] = true
-       }
-}
-
-func findFreeName(seen map[string]bool, name string) string {
-       if !seen[name] {
-               return name
-       }
-       for i := 1; ; i++ {
-               next := fmt.Sprintf("%v'%v", name, i)
-               if !seen[next] {
-                       return next
-               }
-       }
-}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go
new file mode 100644
index 00000000000..56dc20eae0f
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import (
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+func shallowClonePipeline(p *pb.Pipeline) *pb.Pipeline {
+       ret := &pb.Pipeline{
+               Components: shallowCloneComponents(p.GetComponents()),
+       }
+       ret.RootTransformIds, _ = 
reflectx.ShallowClone(p.GetRootTransformIds()).([]string)
+       return ret
+}
+
+func shallowCloneComponents(comp *pb.Components) *pb.Components {
+       ret := &pb.Components{}
+       ret.Transforms, _ = 
reflectx.ShallowClone(comp.GetTransforms()).(map[string]*pb.PTransform)
+       ret.Pcollections, _ = 
reflectx.ShallowClone(comp.GetPcollections()).(map[string]*pb.PCollection)
+       ret.WindowingStrategies, _ = 
reflectx.ShallowClone(comp.GetWindowingStrategies()).(map[string]*pb.WindowingStrategy)
+       ret.Coders, _ = 
reflectx.ShallowClone(comp.GetCoders()).(map[string]*pb.Coder)
+       ret.Environments, _ = 
reflectx.ShallowClone(comp.GetEnvironments()).(map[string]*pb.Environment)
+       return ret
+}
+
+// ShallowClonePTransform makes a shallow copy of the given PTransform.
+func ShallowClonePTransform(t *pb.PTransform) *pb.PTransform {
+       if t == nil {
+               return nil
+       }
+
+       ret := &pb.PTransform{
+               UniqueName:  t.UniqueName,
+               Spec:        t.Spec,
+               DisplayData: t.DisplayData,
+       }
+       ret.Subtransforms, _ = reflectx.ShallowClone(t.Subtransforms).([]string)
+       ret.Inputs, _ = reflectx.ShallowClone(t.Inputs).(map[string]string)
+       ret.Outputs, _ = reflectx.ShallowClone(t.Outputs).(map[string]string)
+       return ret
+}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
new file mode 100644
index 00000000000..4f5d0f1a620
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import (
+       "reflect"
+       "testing"
+
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+func TestShallowClonePTransform(t *testing.T) {
+       tests := []*pb.PTransform{
+               {},
+               {UniqueName: "a"},
+               {Spec: &pb.FunctionSpec{Urn: "foo"}},
+               {Subtransforms: []string{"a", "b"}},
+               {Inputs: map[string]string{"a": "b"}},
+               {Outputs: map[string]string{"a": "b"}},
+       }
+
+       for _, test := range tests {
+               actual := ShallowClonePTransform(test)
+               if !reflect.DeepEqual(actual, test) {
+                       t.Errorf("ShallowClonePCollection(%v) = %v, want id", 
test, actual)
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
new file mode 100644
index 00000000000..56f14b3fa58
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package pipelinex contains utilities for manipulating Beam proto pipelines.
+// The utilities generally uses shallow copies and do not mutate their inputs.
+package pipelinex
+
+import (
+       "fmt"
+       "sort"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// Update merges a pipeline with the given components, which may add, replace
+// or delete its values. It returns the merged pipeline. The input is not
+// modified.
+func Update(p *pb.Pipeline, values *pb.Components) (*pb.Pipeline, error) {
+       ret := shallowClonePipeline(p)
+       reflectx.UpdateMap(ret.Components.Transforms, values.Transforms)
+       reflectx.UpdateMap(ret.Components.Pcollections, values.Pcollections)
+       reflectx.UpdateMap(ret.Components.WindowingStrategies, 
values.WindowingStrategies)
+       reflectx.UpdateMap(ret.Components.Coders, values.Coders)
+       reflectx.UpdateMap(ret.Components.Environments, values.Environments)
+       return Normalize(ret)
+}
+
+// Normalize recomputes derivative information in the pipeline, such
+// as roots and input/output for composite transforms. It also
+// ensures that unique names are so and topologically sorts each
+// subtransform list.
+func Normalize(p *pb.Pipeline) (*pb.Pipeline, error) {
+       if len(p.GetComponents().GetTransforms()) == 0 {
+               return nil, fmt.Errorf("empty pipeline")
+       }
+
+       ret := shallowClonePipeline(p)
+       ret.Components.Transforms = ensureUniqueNames(ret.Components.Transforms)
+       ret.Components.Transforms = 
computeCompositeInputOutput(ret.Components.Transforms)
+       ret.RootTransformIds = computeRoots(ret.Components.Transforms)
+       return ret, nil
+}
+
+// TrimCoders returns the transitive closure of the given coders ids.
+func TrimCoders(coders map[string]*pb.Coder, ids ...string) 
map[string]*pb.Coder {
+       ret := make(map[string]*pb.Coder)
+       for _, id := range ids {
+               walkCoders(coders, ret, id)
+       }
+       return ret
+}
+
+func walkCoders(coders, accum map[string]*pb.Coder, id string) {
+       if _, ok := accum[id]; ok {
+               return // already visited
+       }
+
+       c := coders[id]
+       accum[id] = c
+       for _, sub := range c.ComponentCoderIds {
+               walkCoders(coders, accum, sub)
+       }
+}
+
+// computeRoots returns the root (top-level) transform IDs.
+func computeRoots(xforms map[string]*pb.PTransform) []string {
+       var roots []string
+       parents := makeParentMap(xforms)
+       for id := range xforms {
+               if _, ok := parents[id]; !ok {
+                       // Transforms that do not have a parent is a root
+                       roots = append(roots, id)
+               }
+       }
+       return TopologicalSort(xforms, roots)
+}
+
+func makeParentMap(xforms map[string]*pb.PTransform) map[string]string {
+       parent := make(map[string]string)
+       for id, t := range xforms {
+               for _, key := range t.Subtransforms {
+                       parent[key] = id
+               }
+       }
+       return parent
+}
+
+// computeCompositeInputOutput computes the derived input/output maps
+// for composite transforms.
+func computeCompositeInputOutput(xforms map[string]*pb.PTransform) 
map[string]*pb.PTransform {
+       ret := reflectx.ShallowClone(xforms).(map[string]*pb.PTransform)
+
+       seen := make(map[string]bool)
+       for id := range xforms {
+               walk(id, ret, seen)
+       }
+       return ret
+}
+
+// walk traverses the structure recursively to compute the input/output
+// maps of composite transforms. Update the transform map.
+func walk(id string, ret map[string]*pb.PTransform, seen map[string]bool) {
+       t := ret[id]
+       if seen[id] || len(t.Subtransforms) == 0 {
+               return
+       }
+
+       // Compute the input/output for this composite:
+       //    inputs  := U(subinputs)\U(suboutputs)
+       //    outputs := U(suboutputs)\U(subinputs)
+       // where U is set union and \ is set subtraction.
+
+       in := make(map[string]bool)
+       out := make(map[string]bool)
+       for _, sid := range t.Subtransforms {
+               walk(sid, ret, seen)
+               inout(ret[sid], in, out)
+       }
+
+       upd := ShallowClonePTransform(t)
+       upd.Inputs = diff(in, out)
+       upd.Outputs = diff(out, in)
+       upd.Subtransforms = TopologicalSort(ret, upd.Subtransforms)
+
+       ret[id] = upd
+       seen[id] = true
+}
+
+// diff computes A\B and returns its keys as an identity map.
+func diff(a, b map[string]bool) map[string]string {
+       if len(a) == 0 {
+               return nil
+       }
+       ret := make(map[string]string)
+       for key := range a {
+               if !b[key] {
+                       ret[key] = key
+               }
+       }
+       if len(ret) == 0 {
+               return nil
+       }
+       return ret
+}
+
+// inout adds the input and output pcollection ids to the accumulators.
+func inout(transform *pb.PTransform, in, out map[string]bool) {
+       for _, col := range transform.GetInputs() {
+               in[col] = true
+       }
+       for _, col := range transform.GetOutputs() {
+               out[col] = true
+       }
+}
+
+// ensureUniqueNames ensures that each name is unique. Any conflict is
+// resolved by adding '1, '2, etc to the name.
+func ensureUniqueNames(xforms map[string]*pb.PTransform) 
map[string]*pb.PTransform {
+       ret := reflectx.ShallowClone(xforms).(map[string]*pb.PTransform)
+
+       // Sort the transforms to make to make renaming deterministic.
+       var ordering []string
+       for id, _ := range xforms {
+               ordering = append(ordering, id)
+       }
+       sort.Strings(ordering)
+
+       seen := make(map[string]bool)
+       for _, id := range ordering {
+               t := xforms[id]
+               name := findFreeName(seen, t.UniqueName)
+               seen[name] = true
+
+               if name != t.UniqueName {
+                       upd := ShallowClonePTransform(t)
+                       upd.UniqueName = name
+                       ret[id] = upd
+               }
+       }
+       return ret
+}
+
+func findFreeName(seen map[string]bool, name string) string {
+       if !seen[name] {
+               return name
+       }
+       for i := 1; ; i++ {
+               next := fmt.Sprintf("%v'%v", name, i)
+               if !seen[next] {
+                       return next
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
new file mode 100644
index 00000000000..bb814cdf118
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import (
+       "reflect"
+       "testing"
+
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+func TestEnsureUniqueName(t *testing.T) {
+       tests := []struct {
+               in, exp map[string]*pb.PTransform
+       }{
+               {
+                       in: map[string]*pb.PTransform{
+                               "1": {UniqueName: "a"},
+                               "2": {UniqueName: "b"},
+                               "3": {UniqueName: "c"},
+                       },
+                       exp: map[string]*pb.PTransform{
+                               "1": {UniqueName: "a"},
+                               "2": {UniqueName: "b"},
+                               "3": {UniqueName: "c"},
+                       },
+               },
+               {
+                       in: map[string]*pb.PTransform{
+                               "2": {UniqueName: "a"},
+                               "1": {UniqueName: "a"},
+                               "3": {UniqueName: "a"},
+                       },
+                       exp: map[string]*pb.PTransform{
+                               "1": {UniqueName: "a"},
+                               "2": {UniqueName: "a'1"},
+                               "3": {UniqueName: "a'2"},
+                       },
+               },
+       }
+
+       for _, test := range tests {
+               actual := ensureUniqueNames(test.in)
+               if !reflect.DeepEqual(actual, test.exp) {
+                       t.Errorf("ensureUniqueName(%v) = %v, want %v", test.in, 
actual, test.exp)
+               }
+       }
+}
+
+func TestComputeInputOutput(t *testing.T) {
+       tests := []struct {
+               in, exp map[string]*pb.PTransform
+       }{
+               { // singleton composite
+                       in: map[string]*pb.PTransform{
+                               "1": {
+                                       UniqueName:    "a",
+                                       Subtransforms: []string{"2"},
+                               },
+                               "2": {
+                                       UniqueName: "b",
+                                       Inputs:     map[string]string{"i0": 
"p1"},
+                                       Outputs:    map[string]string{"i0": 
"p2"},
+                               },
+                       },
+                       exp: map[string]*pb.PTransform{
+                               "1": {
+                                       UniqueName:    "a",
+                                       Subtransforms: []string{"2"},
+                                       Inputs:        map[string]string{"p1": 
"p1"},
+                                       Outputs:       map[string]string{"p2": 
"p2"},
+                               },
+                               "2": {
+                                       UniqueName: "b",
+                                       Inputs:     map[string]string{"i0": 
"p1"},
+                                       Outputs:    map[string]string{"i0": 
"p2"},
+                               },
+                       },
+               },
+               { // closed composite
+                       in: map[string]*pb.PTransform{
+                               "1": {
+                                       UniqueName:    "a",
+                                       Subtransforms: []string{"2", "3"},
+                               },
+                               "2": {UniqueName: "b", Outputs: 
map[string]string{"i0": "p1"}},
+                               "3": {UniqueName: "c", Inputs: 
map[string]string{"i0": "p1"}},
+                       },
+                       exp: map[string]*pb.PTransform{
+                               "1": {
+                                       UniqueName:    "a",
+                                       Subtransforms: []string{"2", "3"},
+                               },
+                               "2": {UniqueName: "b", Outputs: 
map[string]string{"i0": "p1"}},
+                               "3": {UniqueName: "c", Inputs: 
map[string]string{"i0": "p1"}},
+                       },
+               },
+       }
+
+       for _, test := range tests {
+               actual := computeCompositeInputOutput(test.in)
+               if !reflect.DeepEqual(actual, test.exp) {
+                       t.Errorf("coimputeInputOutput(%v) = %v, want %v", 
test.in, actual, test.exp)
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
new file mode 100644
index 00000000000..eab32be78cf
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+
+// Bounded returns true iff all PCollections are bounded.
+func Bounded(p *pb.Pipeline) bool {
+       for _, col := range p.GetComponents().GetPcollections() {
+               if col.IsBounded == pb.IsBounded_UNBOUNDED {
+                       return false
+               }
+       }
+       return true
+}
+
+// ContainerImages returns the set of container images used
+// in the given pipeline.
+func ContainerImages(p *pb.Pipeline) []string {
+       var ret []string
+       for _, t := range p.GetComponents().GetEnvironments() {
+               ret = append(ret, t.Url)
+       }
+       return ret
+}
+
+// TopologicalSort returns a topologically sorted list of the given
+// ids, generally from the same scope/composite. Assumes acyclic graph.
+func TopologicalSort(xforms map[string]*pb.PTransform, ids []string) []string {
+       if len(ids) == 0 {
+               return ids
+       }
+
+       v := newVisiter(xforms, ids)
+       for _, id := range ids {
+               v.visit(xforms, id)
+       }
+       return v.output
+}
+
+type visiter struct {
+       output []string
+       index  int
+       seen   map[string]bool
+       next   map[string][]string // collection -> transforms
+}
+
+func newVisiter(xforms map[string]*pb.PTransform, ids []string) *visiter {
+       ret := &visiter{
+               output: make([]string, len(ids), len(ids)),
+               index:  len(ids) - 1,
+               seen:   make(map[string]bool),
+               next:   make(map[string][]string),
+       }
+       for _, id := range ids {
+               for _, in := range xforms[id].Inputs {
+                       ret.next[in] = append(ret.next[in], id)
+               }
+       }
+       return ret
+}
+
+func (v *visiter) visit(xforms map[string]*pb.PTransform, id string) {
+       if v.seen[id] {
+               return
+       }
+       v.seen[id] = true
+       for _, out := range xforms[id].Outputs {
+               for _, next := range v.next[out] {
+                       v.visit(xforms, next)
+               }
+       }
+
+       v.output[v.index] = id
+       v.index--
+}
diff --git a/sdks/go/pkg/beam/core/util/reflectx/util.go 
b/sdks/go/pkg/beam/core/util/reflectx/util.go
new file mode 100644
index 00000000000..724381fe317
--- /dev/null
+++ b/sdks/go/pkg/beam/core/util/reflectx/util.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reflectx
+
+import (
+       "fmt"
+       "reflect"
+)
+
+// ShallowClone creates a shallow copy of the given value. Most
+// useful for slices and maps.
+func ShallowClone(v interface{}) interface{} {
+       if v == nil {
+               return nil
+       }
+       val := reflect.ValueOf(v)
+
+       t := val.Type()
+       switch t.Kind() {
+       case reflect.Slice:
+               if val.IsNil() {
+                       return reflect.Zero(t).Interface() // don't allocate 
for zero values
+               }
+
+               size := val.Len()
+               ret := reflect.MakeSlice(t, size, size)
+               for i := 0; i < size; i++ {
+                       ret.Index(i).Set(val.Index(i))
+               }
+               return ret.Interface()
+
+       case reflect.Map:
+               if val.IsNil() {
+                       return reflect.Zero(t).Interface() // don't allocate 
for zero values
+               }
+
+               ret := reflect.MakeMapWithSize(t, val.Len())
+               keys := val.MapKeys()
+               for _, key := range keys {
+                       ret.SetMapIndex(key, val.MapIndex(key))
+               }
+               return ret.Interface()
+
+       case reflect.Array, reflect.Chan, reflect.Interface, reflect.Func, 
reflect.Invalid:
+               panic(fmt.Sprintf("unsupported type for clone: %v", t))
+
+       default:
+               return v
+       }
+}
+
+// UpdateMap merges two maps of type map[K]*V, with the second overwriting 
values
+// into the first (and mutating it). If the overwriting value is nil, the key 
is
+// deleted.
+func UpdateMap(base, updates interface{}) {
+       if updates == nil {
+               return // ok: nop
+       }
+       if base == nil {
+               panic("base map cannot be nil")
+       }
+
+       m := reflect.ValueOf(base)
+       o := reflect.ValueOf(updates)
+
+       if o.Type().Kind() != reflect.Map || m.Type() != o.Type() {
+               panic(fmt.Sprintf("invalid types for map update: %v != %v", 
m.Type(), o.Type()))
+       }
+
+       keys := o.MapKeys()
+       for _, key := range keys {
+               val := o.MapIndex(key)
+               if val.IsNil() {
+                       m.SetMapIndex(key, reflect.Value{}) // delete
+               } else {
+                       m.SetMapIndex(key, val)
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/core/util/reflectx/util_test.go 
b/sdks/go/pkg/beam/core/util/reflectx/util_test.go
new file mode 100644
index 00000000000..f306b785f6b
--- /dev/null
+++ b/sdks/go/pkg/beam/core/util/reflectx/util_test.go
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reflectx
+
+import (
+       "fmt"
+       "reflect"
+       "testing"
+)
+
+type foo struct {
+       N int
+}
+
+func (f *foo) String() string {
+       return fmt.Sprintf("&foo{%v}", f.N)
+}
+
+func TestMergeMaps(t *testing.T) {
+       tests := []struct {
+               a, b, exp interface{}
+       }{
+               {
+                       a:   map[string]*foo{"a": {1}},
+                       b:   map[string]*foo{"a": {2}},
+                       exp: map[string]*foo{"a": {2}},
+               },
+               {
+                       a:   map[string]*foo{"a": {1}},
+                       b:   nil,
+                       exp: map[string]*foo{"a": {1}},
+               },
+               {
+                       a:   map[string]*foo{"a": {1}},
+                       b:   map[string]*foo{"b": {1}},
+                       exp: map[string]*foo{"a": {1}, "b": {1}},
+               },
+               {
+                       a:   map[string]*foo{"a": {1}},
+                       b:   map[string]*foo{"a": nil, "b": {1}},
+                       exp: map[string]*foo{"b": {1}},
+               },
+               {
+                       a:   map[string]*foo{"a": {1}},
+                       b:   map[string]*foo{"a": {2}, "not_present": nil},
+                       exp: map[string]*foo{"a": {2}},
+               },
+       }
+
+       for _, test := range tests {
+               orig := fmt.Sprintf("%v", test.a) // print before mutation
+
+               UpdateMap(test.a, test.b)
+               if !reflect.DeepEqual(test.a, test.exp) {
+                       t.Errorf("UpdateMap(%v,%v) = %v, want %v", orig, 
test.b, test.a, test.exp)
+               }
+       }
+}
+
+func TestShallowClone(t *testing.T) {
+       tests := []interface{}{
+               nil,
+               2,
+               foo{4},
+               &foo{23},
+               []string{},
+               []string{"a", "c", "e"},
+               map[string]string{},
+               map[string]string{"a": "c", "e": "g"},
+       }
+       for _, test := range tests {
+               actual := ShallowClone(test)
+               if !reflect.DeepEqual(actual, test) {
+                       t.Errorf("ShallowClone(%v) = %v, want id", test, actual)
+               }
+       }
+}
+
+func TestShallowCloneNil(t *testing.T) {
+       var a []string
+
+       ac := ShallowClone(a)
+       if !reflect.DeepEqual(ac, a) {
+               t.Errorf("ShallowClone(%v) = %v, want id", a, ac)
+       }
+
+       var b map[string]string
+
+       bc := ShallowClone(b)
+       if !reflect.DeepEqual(bc, b) {
+               t.Errorf("ShallowClone(%v) = %v, want id", b, bc)
+       }
+}
diff --git a/sdks/go/pkg/beam/core/util/stringx/map.go 
b/sdks/go/pkg/beam/core/util/stringx/map.go
new file mode 100644
index 00000000000..3dec662398e
--- /dev/null
+++ b/sdks/go/pkg/beam/core/util/stringx/map.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package stringx
+
+// Keys returns the domain of a map[string]string.
+func Keys(m map[string]string) []string {
+       var keys []string
+       for k := range m {
+               keys = append(keys, k)
+       }
+       return keys
+}
+
+// Values returns the values of a map[string]string.
+func Values(m map[string]string) []string {
+       var values []string
+       for _, v := range m {
+               values = append(values, v)
+       }
+       return values
+}
+
+// AnyValue returns a value of a map[string]string. Panics
+// if the map is empty.
+func AnyValue(m map[string]string) string {
+       for _, v := range m {
+               return v
+       }
+       panic("map empty")
+}
+
+// SingleValue returns the single value of a map[string]string.
+// Panics if the map does not contain exactly one value.
+func SingleValue(m map[string]string) string {
+       if len(m) != 1 {
+               panic("map not singleton")
+       }
+       return AnyValue(m)
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index bf7018bdeab..be7a85b8982 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -18,33 +18,26 @@
 package dataflow
 
 import (
-       "bytes"
        "context"
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io"
-       "os"
        "path"
+       "sync/atomic"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
-       // Importing to get the side effect of the remote execution hook. See 
init().
-       _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
-       "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+       "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow/dataflowlib"
        "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
        "github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf"
        "github.com/golang/protobuf/proto"
-       "golang.org/x/oauth2/google"
-       df "google.golang.org/api/dataflow/v1b3"
        "google.golang.org/api/storage/v1"
 )
 
@@ -77,14 +70,13 @@ func init() {
        perf.RegisterProfCaptureHook("gcs_profile_writer", gcsRecorderHook)
 }
 
-type dataflowOptions struct {
-       PipelineURL string `json:"pipelineUrl"`
-       Region      string `json:"region"`
-}
+var unique int32
 
 // Execute runs the given pipeline on Google Cloud Dataflow. It uses the
 // default application credentials to submit the job.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
+       // (1) Gather job options
+
        project := *gcpopts.Project
        if project == "" {
                return errors.New("no Google Cloud project specified. Use 
--project=<project>")
@@ -101,12 +93,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                        return fmt.Errorf("error reading --label flag as JSON: 
%v", err)
                }
        }
-       jobName := jobopts.GetJobName()
-
-       edges, nodes, err := p.Build()
-       if err != nil {
-               return err
-       }
 
        if *cpuProfiling != "" {
                perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling)
@@ -123,259 +109,55 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
error {
        }
 
        hooks.SerializeHooksToOptions()
-       options := beam.PipelineOptions.Export()
 
-       // (1) Upload Go binary and model to GCS.
-
-       bin := *jobopts.WorkerBinary
-       if bin == "" {
-               if self, ok := runnerlib.IsWorkerCompatibleBinary(); ok {
-                       bin = self
-                       log.Infof(ctx, "Using running binary as worker binary: 
'%v'", bin)
-               } else {
-                       // Cross-compile as last resort.
-
-                       worker, err := runnerlib.BuildTempWorkerBinary(ctx)
-                       if err != nil {
-                               return err
-                       }
-                       defer os.Remove(worker)
-
-                       bin = worker
-               }
-       } else {
-               log.Infof(ctx, "Using specified worker binary: '%v'", bin)
+       opts := &dataflowlib.JobOptions{
+               Name:           jobopts.GetJobName(),
+               Experiments:    jobopts.GetExperiments(),
+               Options:        beam.PipelineOptions.Export(),
+               Project:        project,
+               Region:         *region,
+               Zone:           *zone,
+               Network:        *network,
+               NumWorkers:     *numWorkers,
+               MachineType:    *machineType,
+               Labels:         jobLabels,
+               TempLocation:   *tempLocation,
+               Worker:         *jobopts.WorkerBinary,
+               TeardownPolicy: *teardownPolicy,
+       }
+       if opts.TempLocation == "" {
+               opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
        }
 
-       log.Infof(ctx, "Staging worker binary: %v", bin)
+       // (1) Build and submit
 
-       binary, err := stageWorker(ctx, project, *stagingLocation, bin)
+       edges, _, err := p.Build()
        if err != nil {
                return err
        }
-       log.Infof(ctx, "Staged worker binary: %v", binary)
-
        model, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: 
*image})
        if err != nil {
                return fmt.Errorf("failed to generate model pipeline: %v", err)
        }
-       log.Info(ctx, proto.MarshalTextString(model))
-
-       modelURL, err := stageModel(ctx, project, *stagingLocation, 
protox.MustEncode(model))
-       if err != nil {
-               return err
-       }
-       log.Infof(ctx, "Staged model pipeline: %v", modelURL)
-
-       // (2) Translate pipeline to v1b3 speak.
-
-       steps, err := translate(edges)
-       if err != nil {
-               return err
-       }
-
-       jobType := "JOB_TYPE_BATCH"
-       apiJobType := "FNAPI_BATCH"
-
-       streaming := !graph.Bounded(nodes)
-       if streaming {
-               jobType = "JOB_TYPE_STREAMING"
-               apiJobType = "FNAPI_STREAMING"
-       }
 
-       job := &df.Job{
-               ProjectId: project,
-               Name:      jobName,
-               Type:      jobType,
-               Environment: &df.Environment{
-                       UserAgent: newMsg(userAgent{
-                               Name:    "Apache Beam SDK for Go",
-                               Version: "0.3.0",
-                       }),
-                       Version: newMsg(version{
-                               JobType: apiJobType,
-                               Major:   "6",
-                       }),
-                       SdkPipelineOptions: newMsg(pipelineOptions{
-                               DisplayData: findPipelineFlags(),
-                               Options: dataflowOptions{
-                                       PipelineURL: modelURL,
-                                       Region:      *region,
-                               },
-                               GoOptions: options,
-                       }),
-                       WorkerPools: []*df.WorkerPool{{
-                               Kind: "harness",
-                               Packages: []*df.Package{{
-                                       Location: binary,
-                                       Name:     "worker",
-                               }},
-                               WorkerHarnessContainerImage: *image,
-                               NumWorkers:                  1,
-                               MachineType:                 *machineType,
-                               Network:                     *network,
-                               Zone:                        *zone,
-                       }},
-                       TempStoragePrefix: *stagingLocation + "/tmp",
-                       Experiments:       append(jobopts.GetExperiments(), 
"beam_fn_api"),
-               },
-               Labels: jobLabels,
-               Steps:  steps,
-       }
-
-       if *numWorkers > 0 {
-               job.Environment.WorkerPools[0].NumWorkers = *numWorkers
-       }
-       if *teardownPolicy != "" {
-               job.Environment.WorkerPools[0].TeardownPolicy = *teardownPolicy
-       }
-       if *tempLocation != "" {
-               job.Environment.TempStoragePrefix = *tempLocation
-       }
-       if streaming {
-               // Add separate data disk for streaming jobs
-               job.Environment.WorkerPools[0].DataDisks = []*df.Disk{{}}
-       }
-       printJob(ctx, job)
+       id := atomic.AddInt32(&unique, 1)
+       modelURL := gcsx.Join(*stagingLocation, fmt.Sprintf("model-%v-%v", id, 
time.Now().UnixNano()))
+       workerURL := gcsx.Join(*stagingLocation, fmt.Sprintf("worker-%v-%v", 
id, time.Now().UnixNano()))
 
        if *dryRun {
                log.Info(ctx, "Dry-run: not submitting job!")
-               return nil
-       }
-
-       // (4) Submit job.
-
-       client, err := newClient(ctx, *endpoint)
-       if err != nil {
-               return err
-       }
-       upd, err := client.Projects.Locations.Jobs.Create(project, *region, 
job).Do()
-       if err != nil {
-               return err
-       }
-
-       log.Infof(ctx, "Submitted job: %v", upd.Id)
-       printJob(ctx, upd)
-       if *endpoint == "" {
-               log.Infof(ctx, "Console: 
https://console.cloud.google.com/dataflow/job/%v?project=%v";, upd.Id, project)
-       }
-       log.Infof(ctx, "Logs: 
https://console.cloud.google.com/logs/viewer?project=%v&resource=dataflow_step%%2Fjob_id%%2F%v";,
 project, upd.Id)
-
-       if *jobopts.Async {
-               return nil
-       }
 
-       time.Sleep(1 * time.Minute)
-       for {
-               j, err := client.Projects.Locations.Jobs.Get(project, *region, 
upd.Id).Do()
+               log.Info(ctx, proto.MarshalTextString(model))
+               job, err := dataflowlib.Translate(model, opts, workerURL, 
modelURL)
                if err != nil {
-                       return fmt.Errorf("failed to get job: %v", err)
+                       return err
                }
-
-               switch j.CurrentState {
-               case "JOB_STATE_DONE":
-                       log.Info(ctx, "Job succeeded!")
-                       return nil
-
-               case "JOB_STATE_CANCELLED":
-                       log.Info(ctx, "Job cancelled")
-                       return nil
-
-               case "JOB_STATE_FAILED":
-                       return fmt.Errorf("job %s failed", upd.Id)
-
-               case "JOB_STATE_RUNNING":
-                       log.Info(ctx, "Job still running ...")
-
-               default:
-                       log.Infof(ctx, "Job state: %v ...", j.CurrentState)
-               }
-
-               time.Sleep(30 * time.Second)
-       }
-}
-
-// stageModel uploads the pipeline model to GCS as a unique object.
-func stageModel(ctx context.Context, project, location string, model []byte) 
(string, error) {
-       bucket, prefix, err := gcsx.ParseObject(location)
-       if err != nil {
-               return "", fmt.Errorf("invalid staging location %v: %v", 
location, err)
-       }
-       obj := path.Join(prefix, fmt.Sprintf("pipeline-%v", 
time.Now().UnixNano()))
-       if *dryRun {
-               full := fmt.Sprintf("gs://%v/%v", bucket, obj)
-               log.Infof(ctx, "Dry-run: not uploading model %v", full)
-               return full, nil
-       }
-
-       client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
-       if err != nil {
-               return "", err
-       }
-       return gcsx.Upload(client, project, bucket, obj, bytes.NewReader(model))
-}
-
-// stageWorker uploads the worker binary to GCS as a unique object.
-func stageWorker(ctx context.Context, project, location, worker string) 
(string, error) {
-       bucket, prefix, err := gcsx.ParseObject(location)
-       if err != nil {
-               return "", fmt.Errorf("invalid staging location %v: %v", 
location, err)
-       }
-       obj := path.Join(prefix, fmt.Sprintf("worker-%v", 
time.Now().UnixNano()))
-       if *dryRun {
-               full := fmt.Sprintf("gs://%v/%v", bucket, obj)
-               log.Infof(ctx, "Dry-run: not uploading binary %v", full)
-               return full, nil
-       }
-
-       client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
-       if err != nil {
-               return "", err
-       }
-       fd, err := os.Open(worker)
-       if err != nil {
-               return "", fmt.Errorf("failed to open worker binary %s: %v", 
worker, err)
-       }
-       defer fd.Close()
-
-       return gcsx.Upload(client, project, bucket, obj, fd)
-}
-
-func findPipelineFlags() []*displayData {
-       var ret []*displayData
-
-       // TODO(herohde) 2/15/2017: decide if we want all set flags.
-       flag.Visit(func(f *flag.Flag) {
-               ret = append(ret, newDisplayData(f.Name, "", "flag", 
f.Value.(flag.Getter).Get()))
-       })
-
-       return ret
-}
-
-// newClient creates a new dataflow client with default application credentials
-// and CloudPlatformScope. The Dataflow endpoint is optionally overridden.
-func newClient(ctx context.Context, endpoint string) (*df.Service, error) {
-       cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
-       if err != nil {
-               return nil, err
-       }
-       client, err := df.New(cl)
-       if err != nil {
-               return nil, err
-       }
-       if endpoint != "" {
-               log.Infof(ctx, "Dataflow endpoint override: %s", endpoint)
-               client.BasePath = endpoint
+               dataflowlib.PrintJob(ctx, job)
+               return nil
        }
-       return client, nil
-}
 
-func printJob(ctx context.Context, job *df.Job) {
-       str, err := json.MarshalIndent(job, "", "  ")
-       if err != nil {
-               log.Infof(ctx, "Failed to print job %v: %v", job.Id, err)
-       }
-       log.Info(ctx, string(str))
+       _, err = dataflowlib.Execute(ctx, model, opts, workerURL, modelURL, 
*endpoint, false)
+       return err
 }
 
 func gcsRecorderHook(opts []string) perf.CaptureHook {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
new file mode 100644
index 00000000000..771ef052dcc
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+       "context"
+       "encoding/json"
+       "os"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+       "github.com/golang/protobuf/proto"
+       df "google.golang.org/api/dataflow/v1b3"
+)
+
+func Execute(ctx context.Context, p *pb.Pipeline, opts *JobOptions, workerURL, 
modelURL, endpoint string, async bool) (string, error) {
+       // (1) Upload Go binary to GCS.
+
+       bin := opts.Worker
+       if bin == "" {
+               if self, ok := runnerlib.IsWorkerCompatibleBinary(); ok {
+                       bin = self
+                       log.Infof(ctx, "Using running binary as worker binary: 
'%v'", bin)
+               } else {
+                       // Cross-compile as last resort.
+
+                       worker, err := runnerlib.BuildTempWorkerBinary(ctx)
+                       if err != nil {
+                               return "", err
+                       }
+                       defer os.Remove(worker)
+
+                       bin = worker
+               }
+       } else {
+               log.Infof(ctx, "Using specified worker binary: '%v'", bin)
+       }
+
+       log.Infof(ctx, "Staging worker binary: %v", bin)
+
+       if err := StageWorker(ctx, opts.Project, workerURL, bin); err != nil {
+               return "", err
+       }
+       log.Infof(ctx, "Staged worker binary: %v", workerURL)
+
+       // (2) Fixup and upload model to GCS
+
+       // TODO(herohde): fixup
+
+       log.Info(ctx, proto.MarshalTextString(p))
+
+       if err := StageModel(ctx, opts.Project, modelURL, 
protox.MustEncode(p)); err != nil {
+               return "", err
+       }
+       log.Infof(ctx, "Staged model pipeline: %v", modelURL)
+
+       // (3) Translate to v1b3 and submit
+
+       job, err := Translate(p, opts, workerURL, modelURL)
+       if err != nil {
+               return "", err
+       }
+       PrintJob(ctx, job)
+
+       client, err := NewClient(ctx, endpoint)
+       if err != nil {
+               return "", err
+       }
+       upd, err := Submit(ctx, client, opts.Project, opts.Region, job)
+       if err != nil {
+               return "", err
+       }
+       log.Infof(ctx, "Submitted job: %v", upd.Id)
+       if endpoint == "" {
+               log.Infof(ctx, "Console: 
https://console.cloud.google.com/dataflow/job/%v?project=%v";, upd.Id, 
opts.Project)
+       }
+       log.Infof(ctx, "Logs: 
https://console.cloud.google.com/logs/viewer?project=%v&resource=dataflow_step%%2Fjob_id%%2F%v";,
 opts.Project, upd.Id)
+
+       if async {
+               return upd.Id, nil
+       }
+
+       // (4) Wait for completion.
+
+       return upd.Id, WaitForCompletion(ctx, client, opts.Project, 
opts.Region, upd.Id)
+}
+
+// PrintJob logs the Dataflow job.
+func PrintJob(ctx context.Context, job *df.Job) {
+       str, err := json.MarshalIndent(job, "", "  ")
+       if err != nil {
+               log.Infof(ctx, "Failed to print job %v: %v", job.Id, err)
+       }
+       log.Info(ctx, string(str))
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
new file mode 100644
index 00000000000..48e9bedd03b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+       // Importing to get the side effect of the remote execution hook. See 
init().
+       _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "golang.org/x/oauth2/google"
+       df "google.golang.org/api/dataflow/v1b3"
+)
+
+// JobOptions capture the various options for submitting jobs
+// to Dataflow.
+type JobOptions struct {
+       // Name is the job name.
+       Name string
+       // Experiments are additional experiments.
+       Experiments []string
+       // Pipeline options
+       Options runtime.RawOptions
+
+       Project     string
+       Region      string
+       Zone        string
+       Network     string
+       NumWorkers  int64
+       MachineType string
+       Labels      map[string]string
+
+       TempLocation string
+
+       // Worker is the worker binary override.
+       Worker string
+
+       // -- Internal use only. Not supported in public Dataflow. --
+
+       TeardownPolicy string
+}
+
+// Translate translates a pipeline to a Dataflow job.
+func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, modelURL string) 
(*df.Job, error) {
+       // (1) Translate pipeline to v1b3 speak.
+
+       steps, err := translate(p)
+       if err != nil {
+               return nil, err
+       }
+
+       jobType := "JOB_TYPE_BATCH"
+       apiJobType := "FNAPI_BATCH"
+
+       streaming := !pipelinex.Bounded(p)
+       if streaming {
+               jobType = "JOB_TYPE_STREAMING"
+               apiJobType = "FNAPI_STREAMING"
+       }
+
+       images := pipelinex.ContainerImages(p)
+       if len(images) != 1 {
+               return nil, fmt.Errorf("Dataflow supports one container image 
only: %v", images)
+       }
+
+       job := &df.Job{
+               ProjectId: opts.Project,
+               Name:      opts.Name,
+               Type:      jobType,
+               Environment: &df.Environment{
+                       UserAgent: newMsg(userAgent{
+                               Name:    "Apache Beam SDK for Go",
+                               Version: "0.3.0",
+                       }),
+                       Version: newMsg(version{
+                               JobType: apiJobType,
+                               Major:   "6",
+                       }),
+                       SdkPipelineOptions: newMsg(pipelineOptions{
+                               DisplayData: printOptions(opts, images),
+                               Options: dataflowOptions{
+                                       PipelineURL: modelURL,
+                                       Region:      opts.Region,
+                               },
+                               GoOptions: opts.Options,
+                       }),
+                       WorkerPools: []*df.WorkerPool{{
+                               Kind: "harness",
+                               Packages: []*df.Package{{
+                                       Location: workerURL,
+                                       Name:     "worker",
+                               }},
+                               WorkerHarnessContainerImage: images[0],
+                               NumWorkers:                  1,
+                               MachineType:                 opts.MachineType,
+                               Network:                     opts.Network,
+                               Zone:                        opts.Zone,
+                       }},
+                       TempStoragePrefix: opts.TempLocation,
+                       Experiments:       append(opts.Experiments, 
"beam_fn_api"),
+               },
+               Labels: opts.Labels,
+               Steps:  steps,
+       }
+
+       if opts.NumWorkers > 0 {
+               job.Environment.WorkerPools[0].NumWorkers = opts.NumWorkers
+       }
+       if opts.TeardownPolicy != "" {
+               job.Environment.WorkerPools[0].TeardownPolicy = 
opts.TeardownPolicy
+       }
+       if streaming {
+               // Add separate data disk for streaming jobs
+               job.Environment.WorkerPools[0].DataDisks = []*df.Disk{{}}
+       }
+       return job, nil
+}
+
+// Submit submits a prepared job to Cloud Dataflow.
+func Submit(ctx context.Context, client *df.Service, project, region string, 
job *df.Job) (*df.Job, error) {
+       return client.Projects.Locations.Jobs.Create(project, region, job).Do()
+}
+
+// WaitForCompletion monitors the given job until completion. It logs any 
messages
+// and state changes received.
+func WaitForCompletion(ctx context.Context, client *df.Service, project, 
region, jobID string) error {
+       for {
+               j, err := client.Projects.Locations.Jobs.Get(project, region, 
jobID).Do()
+               if err != nil {
+                       return fmt.Errorf("failed to get job: %v", err)
+               }
+
+               switch j.CurrentState {
+               case "JOB_STATE_DONE":
+                       log.Info(ctx, "Job succeeded!")
+                       return nil
+
+               case "JOB_STATE_CANCELLED":
+                       log.Info(ctx, "Job cancelled")
+                       return nil
+
+               case "JOB_STATE_FAILED":
+                       return fmt.Errorf("job %s failed", jobID)
+
+               case "JOB_STATE_RUNNING":
+                       log.Info(ctx, "Job still running ...")
+
+               default:
+                       log.Infof(ctx, "Job state: %v ...", j.CurrentState)
+               }
+
+               time.Sleep(30 * time.Second)
+       }
+}
+
+// NewClient creates a new dataflow client with default application credentials
+// and CloudPlatformScope. The Dataflow endpoint is optionally overridden.
+func NewClient(ctx context.Context, endpoint string) (*df.Service, error) {
+       cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
+       if err != nil {
+               return nil, err
+       }
+       client, err := df.New(cl)
+       if err != nil {
+               return nil, err
+       }
+       if endpoint != "" {
+               log.Infof(ctx, "Dataflow endpoint override: %s", endpoint)
+               client.BasePath = endpoint
+       }
+       return client, nil
+}
+
+type dataflowOptions struct {
+       PipelineURL string `json:"pipelineUrl"`
+       Region      string `json:"region"`
+}
+
+func printOptions(opts *JobOptions, images []string) []*displayData {
+       var ret []*displayData
+       addIfNonEmpty := func(name string, value string) {
+               if value != "" {
+                       ret = append(ret, newDisplayData(name, "", "options", 
value))
+               }
+       }
+
+       addIfNonEmpty("name", opts.Name)
+       addIfNonEmpty("experiments", strings.Join(opts.Experiments, ","))
+       addIfNonEmpty("project", opts.Project)
+       addIfNonEmpty("region", opts.Region)
+       addIfNonEmpty("zone", opts.Zone)
+       addIfNonEmpty("network", opts.Network)
+       addIfNonEmpty("machine_type", opts.MachineType)
+       addIfNonEmpty("container_images", strings.Join(images, ","))
+       addIfNonEmpty("temp_location", opts.TempLocation)
+
+       for k, v := range opts.Options.Options {
+               ret = append(ret, newDisplayData(k, "", "go_options", v))
+       }
+       return ret
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/messages.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
similarity index 99%
rename from sdks/go/pkg/beam/runners/dataflow/messages.go
rename to sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
index 4e1dd839df2..a39ec0c15a6 100644
--- a/sdks/go/pkg/beam/runners/dataflow/messages.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package dataflow
+package dataflowlib
 
 import (
        "encoding/json"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
new file mode 100644
index 00000000000..46f0aed6a3e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
+       "os"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+       "google.golang.org/api/storage/v1"
+)
+
+// StageModel uploads the pipeline model to GCS as a unique object.
+func StageModel(ctx context.Context, project, modelURL string, model []byte) 
error {
+       return upload(ctx, project, modelURL, bytes.NewReader(model))
+}
+
+// StageWorker uploads the worker binary to GCS as a unique object.
+func StageWorker(ctx context.Context, project, workerURL, worker string) error 
{
+       fd, err := os.Open(worker)
+       if err != nil {
+               return fmt.Errorf("failed to open worker binary %s: %v", 
worker, err)
+       }
+       defer fd.Close()
+
+       return upload(ctx, project, workerURL, fd)
+}
+
+func upload(ctx context.Context, project, object string, r io.Reader) error {
+       bucket, obj, err := gcsx.ParseObject(object)
+       if err != nil {
+               return fmt.Errorf("invalid staging location %v: %v", object, 
err)
+       }
+       client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+       if err != nil {
+               return err
+       }
+       _, err = gcsx.Upload(client, project, bucket, obj, r)
+       return err
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
new file mode 100644
index 00000000000..56625242c4f
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+       "bytes"
+       "fmt"
+       "net/url"
+       "path"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
+       pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/golang/protobuf/proto"
+       df "google.golang.org/api/dataflow/v1b3"
+)
+
+const (
+       impulseKind    = "CreateCollection"
+       parDoKind      = "ParallelDo"
+       flattenKind    = "Flatten"
+       gbkKind        = "GroupByKey"
+       windowIntoKind = "Bucket"
+
+       sideInputKind = "CollectionToSingleton"
+
+       // Support for Dataflow native I/O, such as PubSub.
+       readKind  = "ParallelRead"
+       writeKind = "ParallelWrite"
+)
+
+// translate translates a pipeline into a sequence of Dataflow steps. The step
+// representation and its semantics are complex. In particular, the service
+// optimizes the steps (step fusing, etc.) and may move steps around. Our
+// decorations of the steps must thus be robust against such changes, so that
+// they can be properly decoded in the harness. There are multiple quirks and
+// requirements of specific semi-opaque formats, such as base64 encoded blobs.
+//
+// Moreover, the harness sees pieces of the translated steps only -- not the
+// full graph. Special steps are also inserted around GBK, for example, which
+// makes the placement of the decoration somewhat tricky. The harness will
+// also never see steps that the service executes directly, notably GBK/CoGBK.
+func translate(p *pb.Pipeline) ([]*df.Step, error) {
+       // NOTE: Dataflow apparently assumes that the steps are in topological 
order.
+       // Otherwise, it fails with "Output out for step  was not found.". We 
assume
+       // the pipeline has been normalized and each subtransform list is in 
such order.
+
+       x := newTranslator(p.GetComponents())
+       return x.translateTransforms("", p.GetRootTransformIds())
+}
+
+type translator struct {
+       comp          *pb.Components
+       pcollections  map[string]*outputReference
+       coders        *graphx.CoderUnmarshaller
+       bogusCoderRef *graphx.CoderRef
+}
+
+func newTranslator(comp *pb.Components) *translator {
+       bytesCoderRef, _ := graphx.EncodeCoderRef(coder.NewW(coder.NewBytes(), 
coder.NewGlobalWindow()))
+
+       return &translator{
+               comp:          comp,
+               pcollections:  makeOutputReferences(comp.GetTransforms()),
+               coders:        graphx.NewCoderUnmarshaller(comp.GetCoders()),
+               bogusCoderRef: bytesCoderRef,
+       }
+}
+
+func (x *translator) translateTransforms(trunk string, ids []string) 
([]*df.Step, error) {
+       var steps []*df.Step
+       for _, id := range ids {
+               sub, err := x.translateTransform(trunk, id)
+               if err != nil {
+                       return nil, err
+               }
+               steps = append(steps, sub...)
+       }
+       return steps, nil
+}
+
+func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, 
error) {
+       t := x.comp.Transforms[id]
+
+       prop := properties{
+               UserName:   userName(trunk, t.UniqueName),
+               OutputInfo: x.translateOutputs(t.Outputs),
+       }
+
+       urn := t.GetSpec().GetUrn()
+       switch urn {
+       case graphx.URNImpulse:
+               // NOTE: The impulse []data value is encoded in a special way 
as a
+               // URL Query-escaped windowed _unnested_ value. It is read back 
in
+               // a nested context at runtime.
+               var buf bytes.Buffer
+               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+                       return nil, err
+               }
+               value := string(append(buf.Bytes(), t.GetSpec().Payload...))
+               // log.Printf("Impulse data: %v", url.QueryEscape(value))
+
+               prop.Element = []string{url.QueryEscape(value)}
+               return []*df.Step{x.newStep(id, impulseKind, prop)}, nil
+
+       case graphx.URNParDo:
+               var payload pb.ParDoPayload
+               if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil 
{
+                       return nil, fmt.Errorf("invalid ParDo payload for %v: 
%v", t, err)
+               }
+
+               var steps []*df.Step
+               rem := reflectx.ShallowClone(t.Inputs).(map[string]string)
+
+               prop.NonParallelInputs = make(map[string]*outputReference)
+               for key := range payload.SideInputs {
+                       // Side input require an additional conversion step, 
which must
+                       // be before the present one.
+                       delete(rem, key)
+
+                       pcol := x.comp.Pcollections[t.Inputs[key]]
+                       ref := x.pcollections[t.Inputs[key]]
+                       c := x.translateCoder(pcol, pcol.CoderId)
+
+                       side := &df.Step{
+                               Name: fmt.Sprintf("view%v_%v", id, key),
+                               Kind: sideInputKind,
+                               Properties: newMsg(properties{
+                                       ParallelInput: ref,
+                                       OutputInfo: []output{{
+                                               UserName:   "i0",
+                                               OutputName: "i0",
+                                               Encoding:   
graphx.WrapIterable(c),
+                                       }},
+                                       UserName: userName(trunk, 
fmt.Sprintf("AsView%v_%v", id, key)),
+                               }),
+                       }
+                       steps = append(steps, side)
+
+                       prop.NonParallelInputs[side.Name] = 
newOutputReference(side.Name, "i0")
+               }
+
+               in := stringx.SingleValue(rem)
+
+               prop.ParallelInput = x.pcollections[in]
+               prop.SerializedFn = id // == reference into the proto pipeline
+               return append(steps, x.newStep(id, parDoKind, prop)), nil
+
+       case graphx.URNFlatten:
+               for _, in := range t.Inputs {
+                       prop.Inputs = append(prop.Inputs, x.pcollections[in])
+               }
+               return []*df.Step{x.newStep(id, flattenKind, prop)}, nil
+
+       case graphx.URNGBK:
+               in := stringx.SingleValue(t.Inputs)
+
+               prop.ParallelInput = x.pcollections[in]
+               prop.DisallowCombinerLifting = true
+               prop.SerializedFn = 
encodeSerializedFn(x.extractWindowingStrategy(in))
+               return []*df.Step{x.newStep(id, gbkKind, prop)}, nil
+
+       case graphx.URNWindow:
+               in := stringx.SingleValue(t.Inputs)
+               out := stringx.SingleValue(t.Outputs)
+
+               prop.ParallelInput = x.pcollections[in]
+               prop.SerializedFn = 
encodeSerializedFn(x.extractWindowingStrategy(out))
+               return []*df.Step{x.newStep(id, windowIntoKind, prop)}, nil
+
+       case pubsub_v1.PubSubPayloadURN:
+               // Translate to native handling of PubSub I/O.
+
+               var msg pubsub_v1.PubSubPayload
+               if err := proto.Unmarshal(t.Spec.Payload, &msg); err != nil {
+                       return nil, fmt.Errorf("bad pubsub payload: %v", err)
+               }
+
+               prop.Format = "pubsub"
+               prop.PubSubTopic = msg.GetTopic()
+               prop.PubSubSubscription = msg.GetSubscription()
+               prop.PubSubIDLabel = msg.GetIdAttribute()
+               prop.PubSubTimestampLabel = msg.GetTimestampAttribute()
+               prop.PubSubWithAttributes = msg.GetWithAttributes()
+
+               if prop.PubSubSubscription != "" {
+                       prop.PubSubTopic = ""
+               }
+
+               switch msg.Op {
+               case pubsub_v1.PubSubPayload_READ:
+                       return []*df.Step{x.newStep(id, readKind, prop)}, nil
+
+               case pubsub_v1.PubSubPayload_WRITE:
+                       in := stringx.SingleValue(t.Inputs)
+
+                       prop.ParallelInput = x.pcollections[in]
+                       prop.Encoding = x.wrapCoder(x.comp.Pcollections[in], 
coder.NewBytes())
+                       return []*df.Step{x.newStep(id, writeKind, prop)}, nil
+
+               default:
+                       return nil, fmt.Errorf("bad pubsub op: %v", msg.Op)
+               }
+
+       default:
+               // TODO: graphx.URNCombinePerKey:
+
+               if len(t.Subtransforms) > 0 {
+                       return x.translateTransforms(fmt.Sprintf("%v%v/", 
trunk, path.Base(t.UniqueName)), t.Subtransforms)
+               }
+
+               return nil, fmt.Errorf("unexpected primitive urn: %v", t)
+       }
+}
+
+func (x *translator) newStep(id, kind string, prop properties) *df.Step {
+       step := &df.Step{
+               Name:       id,
+               Kind:       kind,
+               Properties: newMsg(prop),
+       }
+       if prop.PubSubWithAttributes {
+               // Hack to add a empty-value property for PubSub IO. This
+               // will make PubSub send the entire message, not just
+               // the payload.
+               prop.PubSubWithAttributes = false
+               step.Properties = 
newMsg(propertiesWithPubSubMessage{properties: prop})
+       }
+       return step
+}
+
+func (x *translator) translateOutputs(outputs map[string]string) []output {
+       var ret []output
+       for _, out := range outputs {
+               pcol := x.comp.Pcollections[out]
+               ref := x.pcollections[out]
+
+               info := output{
+                       UserName:   ref.OutputName,
+                       OutputName: ref.OutputName,
+                       Encoding:   x.translateCoder(pcol, pcol.CoderId),
+               }
+               ret = append(ret, info)
+       }
+       if len(ret) == 0 {
+               // Dataflow seems to require at least one output. We insert
+               // a bogus one (named "bogus") and remove it in the harness.
+
+               ret = []output{{
+                       UserName:   "bogus",
+                       OutputName: "bogus",
+                       Encoding:   x.bogusCoderRef,
+               }}
+       }
+       return ret
+}
+
+func (x *translator) translateCoder(pcol *pb.PCollection, id string) 
*graphx.CoderRef {
+       c, err := x.coders.Coder(id)
+       if err != nil {
+               panic(err)
+       }
+       return x.wrapCoder(pcol, c)
+}
+
+func (x *translator) wrapCoder(pcol *pb.PCollection, c *coder.Coder) 
*graphx.CoderRef {
+       // TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
+
+       ws := x.comp.WindowingStrategies[pcol.WindowingStrategyId]
+       wc, err := x.coders.WindowCoder(ws.WindowCoderId)
+       if err != nil {
+               panic(fmt.Sprintf("failed to decode window coder %v for 
windowing strategy %v: %v", ws.WindowCoderId, pcol.WindowingStrategyId, err))
+       }
+       ret, err := graphx.EncodeCoderRef(coder.NewW(c, wc))
+       if err != nil {
+               panic(fmt.Sprintf("failed to wrap coder %v for windowing 
strategy %v: %v", c, pcol.WindowingStrategyId, err))
+       }
+       return ret
+}
+
+// extractWindowingStrategy returns a self-contained windowing strategy from
+// the given pcollection id.
+func (x *translator) extractWindowingStrategy(pid string) 
*pb.MessageWithComponents {
+       ws := 
x.comp.WindowingStrategies[x.comp.Pcollections[pid].WindowingStrategyId]
+
+       msg := &pb.MessageWithComponents{
+               Components: &pb.Components{
+                       Coders: pipelinex.TrimCoders(x.comp.Coders, 
ws.WindowCoderId),
+               },
+               Root: &pb.MessageWithComponents_WindowingStrategy{
+                       WindowingStrategy: ws,
+               },
+       }
+       return msg
+}
+
+// makeOutputReferences builds a map from PCollection id to the Dataflow 
representation.
+// Each output is named after the generating transform.
+func makeOutputReferences(xforms map[string]*pb.PTransform) 
map[string]*outputReference {
+       ret := make(map[string]*outputReference)
+       for id, t := range xforms {
+               if len(t.Subtransforms) > 0 {
+                       continue // ignore composites
+               }
+               for name, out := range t.Outputs {
+                       ret[out] = newOutputReference(id, name)
+               }
+       }
+       return ret
+}
+
+// userName computes a Dataflow composite name understood by the Dataflow UI,
+// determined by the scope nesting. Dataflow simply uses "/" to separate
+// composite transforms, so we must remove them from the otherwise qualified
+// package names of DoFns, etc. Assumes trunk ends in / or is empty.
+func userName(trunk, name string) string {
+       return fmt.Sprintf("%v%v", trunk, path.Base(name))
+}
+
+func encodeSerializedFn(in proto.Message) string {
+       // The Beam Runner API uses percent-encoding for serialized fn messages.
+       // See: https://en.wikipedia.org/wiki/Percent-encoding
+
+       data := protox.MustEncode(in)
+       return url.PathEscape(string(data))
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/translate.go 
b/sdks/go/pkg/beam/runners/dataflow/translate.go
deleted file mode 100644
index d8644ef9273..00000000000
--- a/sdks/go/pkg/beam/runners/dataflow/translate.go
+++ /dev/null
@@ -1,479 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package dataflow
-
-import (
-       "bytes"
-       "fmt"
-       "net/url"
-       "path"
-
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
-       pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
-       rnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
-       df "google.golang.org/api/dataflow/v1b3"
-)
-
-const (
-       impulseKind    = "CreateCollection"
-       parDoKind      = "ParallelDo"
-       flattenKind    = "Flatten"
-       gbkKind        = "GroupByKey"
-       windowIntoKind = "Bucket"
-
-       sideInputKind = "CollectionToSingleton"
-
-       // Support for Dataflow native I/O, such as PubSub.
-       readKind  = "ParallelRead"
-       writeKind = "ParallelWrite"
-)
-
-// translate translates a Graph into a sequence of Dataflow steps. The step
-// representation and its semantics are complex. In particular, the service
-// optimizes the steps (step fusing, etc.) and may move steps around. Our
-// decorations of the steps must thus be robust against such changes, so that
-// they can be properly decoded in the harness. There are multiple quirks and
-// requirements of specific semi-opaque formats, such as base64 encoded blobs.
-//
-// Moreover, the harness sees pieces of the translated steps only -- not the
-// full graph. Special steps are also inserted around GBK, for example, which
-// makes the placement of the decoration somewhat tricky. The harness will
-// also never see steps that the service executes directly, notably GBK/CoGBK.
-func translate(edges []*graph.MultiEdge) ([]*df.Step, error) {
-       // NOTE: Dataflow apparently assumes that the steps are in topological 
order.
-       // Otherwise, it fails with "Output out for step  was not found." So we
-       // preserve the creation order.
-
-       nodes := translateNodes(edges)
-
-       var steps []*df.Step
-       for _, edge := range edges {
-               if edge.Op == graph.CoGBK && len(edge.Input) > 1 {
-                       expanded, err := expandCoGBK(nodes, edge)
-                       if err != nil {
-                               return nil, err
-                       }
-                       steps = append(steps, expanded...)
-                       continue
-               }
-
-               kind, prop, err := translateEdge(edge)
-               if err != nil {
-                       return nil, err
-               }
-
-               if edge.Op == graph.Flatten {
-                       for _, in := range edge.Input {
-                               prop.Inputs = append(prop.Inputs, 
nodes[in.From.ID()])
-                       }
-               } else if len(edge.Input) > 0 {
-                       prop.ParallelInput = nodes[edge.Input[0].From.ID()]
-
-                       prop.NonParallelInputs = 
make(map[string]*outputReference)
-                       for i := 1; i < len(edge.Input); i++ {
-                               // Side input requires an additional conversion 
step, which must
-                               // be before the present one.
-
-                               ref := nodes[edge.Input[i].From.ID()]
-                               c, err := 
encodeCoderRef(edge.Input[i].From.Coder, 
edge.Input[i].From.WindowingStrategy().Fn.Coder())
-                               if err != nil {
-                                       return nil, err
-                               }
-
-                               side := &df.Step{
-                                       Name: fmt.Sprintf("view%v_%v", 
edge.ID(), i),
-                                       Kind: sideInputKind,
-                                       Properties: newMsg(properties{
-                                               ParallelInput: ref,
-                                               OutputInfo: []output{{
-                                                       UserName:   "out",
-                                                       OutputName: "out",
-                                                       Encoding:   
graphx.WrapExtraWindowedValue(c),
-                                               }},
-                                               UserName: 
buildName(edge.Scope(), "AsView"),
-                                       }),
-                               }
-                               steps = append(steps, side)
-
-                               prop.NonParallelInputs[side.Name] = 
newOutputReference(side.Name, "out")
-                       }
-               }
-
-               for _, out := range edge.Output {
-                       ref := nodes[out.To.ID()]
-                       coder, err := encodeCoderRef(out.To.Coder, 
out.To.WindowingStrategy().Fn.Coder())
-                       if err != nil {
-                               return nil, err
-                       }
-                       info := output{
-                               UserName:   ref.OutputName,
-                               OutputName: ref.OutputName,
-                               Encoding:   coder,
-                       }
-                       prop.OutputInfo = append(prop.OutputInfo, info)
-               }
-               if len(prop.OutputInfo) == 0 {
-                       // Dataflow seems to require at least one output. We 
insert
-                       // a bogus one (named "bogus") and remove it in the 
harness.
-
-                       coder, err := encodeCoderRef(edge.Input[0].From.Coder, 
edge.Input[0].From.WindowingStrategy().Fn.Coder())
-                       if err != nil {
-                               return nil, err
-                       }
-
-                       prop.OutputInfo = []output{{
-                               UserName:   "bogus",
-                               OutputName: "bogus",
-                               Encoding:   coder,
-                       }}
-               }
-
-               step := &df.Step{
-                       Name:       stepID(edge.ID()),
-                       Kind:       kind,
-                       Properties: newMsg(prop),
-               }
-               if prop.PubSubWithAttributes {
-                       // Hack to add a empty-value property for PubSub IO. 
This
-                       // will make PubSub send the entire message, not just
-                       // the payload.
-                       prop.PubSubWithAttributes = false
-                       step.Properties = 
newMsg(propertiesWithPubSubMessage{properties: prop})
-               }
-               steps = append(steps, step)
-       }
-
-       return steps, nil
-}
-
-func expandCoGBK(nodes map[int]*outputReference, edge *graph.MultiEdge) 
([]*df.Step, error) {
-       // TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have 
to translate
-       // CoGBK with multiple PCollections as described in graphx/cogbk.go.
-
-       id := graphx.StableMultiEdgeID(edge)
-       wc := edge.Output[0].To.WindowingStrategy().Fn.Coder()
-       kvCoder, err := encodeCoderRef(graphx.MakeKVUnionCoder(edge), wc)
-       if err != nil {
-               return nil, err
-       }
-       gbkCoder, err := encodeCoderRef(graphx.MakeGBKUnionCoder(edge), wc)
-       if err != nil {
-               return nil, err
-       }
-
-       var steps []*df.Step
-
-       var inputs []*outputReference
-       for i, in := range edge.Input {
-               // Inject(i)
-
-               injectID := graphx.StableCoGBKInjectID(id, i)
-               out := newOutputReference(injectID, "out")
-
-               inject := &df.Step{
-                       Name: injectID,
-                       Kind: parDoKind,
-                       Properties: newMsg(properties{
-                               ParallelInput: nodes[in.From.ID()],
-                               SerializedFn: 
makeSerializedFnPayload(&v1.TransformPayload{
-                                       Urn:    graphx.URNInject,
-                                       Inject: &v1.InjectPayload{N: 
(int32)(i)},
-                               }),
-                               OutputInfo: []output{{
-                                       UserName:   out.OutputName,
-                                       OutputName: out.OutputName,
-                                       Encoding:   kvCoder,
-                               }},
-                               UserName: buildName(edge.Scope(), injectID),
-                       }),
-               }
-
-               inputs = append(inputs, out)
-               steps = append(steps, inject)
-       }
-
-       // Flatten
-
-       flattenID := graphx.StableCoGBKFlattenID(id)
-       out := newOutputReference(flattenID, "out")
-
-       flatten := &df.Step{
-               Name: flattenID,
-               Kind: flattenKind,
-               Properties: newMsg(properties{
-                       Inputs: inputs,
-                       OutputInfo: []output{{
-                               UserName:   out.OutputName,
-                               OutputName: out.OutputName,
-                               Encoding:   kvCoder,
-                       }},
-                       UserName: buildName(edge.Scope(), flattenID),
-               }),
-       }
-       steps = append(steps, flatten)
-
-       // GBK
-
-       gbkID := graphx.StableCoGBKGBKID(id)
-       gbkOut := newOutputReference(gbkID, "out")
-
-       w := edge.Input[0].From.WindowingStrategy()
-       sfn, err := encodeSerializedFn(translateWindowingStrategy(w))
-       if err != nil {
-               return nil, err
-       }
-
-       gbk := &df.Step{
-               Name: gbkID,
-               Kind: gbkKind,
-               Properties: newMsg(properties{
-                       ParallelInput: out,
-                       OutputInfo: []output{{
-                               UserName:   gbkOut.OutputName,
-                               OutputName: gbkOut.OutputName,
-                               Encoding:   gbkCoder,
-                       }},
-                       UserName:                buildName(edge.Scope(), 
"group"),
-                       DisallowCombinerLifting: true,
-                       SerializedFn:            sfn,
-               }),
-       }
-       steps = append(steps, gbk)
-
-       // Expand
-
-       ref := nodes[edge.Output[0].To.ID()]
-       coder, err := encodeCoderRef(edge.Output[0].To.Coder, wc)
-       if err != nil {
-               return nil, err
-       }
-
-       expand := &df.Step{
-               Name: stepID(edge.ID()), // the successor references this name.
-               Kind: parDoKind,
-               Properties: newMsg(properties{
-                       ParallelInput: gbkOut,
-                       OutputInfo: []output{{
-                               UserName:   ref.OutputName,
-                               OutputName: ref.OutputName,
-                               Encoding:   coder,
-                       }},
-                       UserName: buildName(edge.Scope(), "expand"),
-                       SerializedFn: 
makeSerializedFnPayload(&v1.TransformPayload{
-                               Urn: graphx.URNExpand,
-                       }),
-               }),
-       }
-       steps = append(steps, expand)
-
-       return steps, nil
-}
-
-// translateNodes builds a map from nodeID to the Dataflow representation.
-func translateNodes(edges []*graph.MultiEdge) map[int]*outputReference {
-       nodes := make(map[int]*outputReference)
-       for _, edge := range edges {
-               for i, out := range edge.Output {
-                       // Encode the ordering to match the scheme in 
exec/translate/unmarshalKeyedValues()
-                       name := fmt.Sprintf("i%v", i)
-                       nodes[out.To.ID()] = 
newOutputReference(stepID(edge.ID()), name)
-               }
-       }
-       return nodes
-}
-
-// TODO(herohde) 2/15/2017: user names encode composite names via 
"/"-separation.
-// We'll need to ensure that scopes are uniquely named.
-
-// translateEdge translates part of a MultiEdge to the Dataflow kind and
-// step-specific properties. We can't conveniently return a Step, because we
-// need to add more properties and the (partly-encoded) Step form prevents such
-// updates.
-func translateEdge(edge *graph.MultiEdge) (string, properties, error) {
-       switch edge.Op {
-       case graph.Impulse:
-               // NOTE: The impulse []data value is encoded in a special way 
as a
-               // URL Query-escaped windowed _unnested_ value. It is read back 
in
-               // a nested context at runtime.
-               var buf bytes.Buffer
-               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
-                       return "", properties{}, err
-               }
-               value := string(append(buf.Bytes(), edge.Value...))
-
-               // log.Printf("Impulse data: %v", url.QueryEscape(value))
-
-               return impulseKind, properties{
-                       UserName: buildName(edge.Scope(), "create"),
-                       Element:  []string{url.QueryEscape(value)},
-               }, nil
-
-       case graph.ParDo:
-               return parDoKind, properties{
-                       UserName:     buildName(edge.Scope(), edge.DoFn.Name()),
-                       SerializedFn: graphx.StableMultiEdgeID(edge),
-               }, nil
-
-       case graph.Combine:
-               // TODO(flaviocf) 8/08/2017: When combiners are supported, 
change "ParallelDo" to
-               // "CombineValues", encode accumulator coder and pass it as a 
property "Encoding".
-               return parDoKind, properties{
-                       UserName:     buildName(edge.Scope(), 
edge.CombineFn.Name()),
-                       SerializedFn: graphx.StableMultiEdgeID(edge),
-               }, nil
-
-       case graph.WindowInto:
-               w := edge.Output[0].To.WindowingStrategy()
-               sfn, err := encodeSerializedFn(translateWindowingStrategy(w))
-               if err != nil {
-                       return "", properties{}, err
-               }
-               return windowIntoKind, properties{
-                       UserName:     buildName(edge.Scope(), "window"), // 
TODO: user-defined
-                       SerializedFn: sfn,
-               }, nil
-
-       case graph.CoGBK:
-               w := edge.Input[0].From.WindowingStrategy()
-               sfn, err := encodeSerializedFn(translateWindowingStrategy(w))
-               if err != nil {
-                       return "", properties{}, err
-               }
-               return gbkKind, properties{
-                       UserName:                buildName(edge.Scope(), 
"group"), // TODO: user-defined
-                       DisallowCombinerLifting: true,
-                       SerializedFn:            sfn,
-               }, nil
-
-       case graph.Flatten:
-               return flattenKind, properties{
-                       UserName: buildName(edge.Scope(), "flatten"), // TODO: 
user-defined
-               }, nil
-
-       case graph.External:
-               switch edge.Payload.URN {
-               case pubsub_v1.PubSubPayloadURN:
-                       // Translate to native handling of PubSub I/O.
-
-                       var msg pubsub_v1.PubSubPayload
-                       if err := proto.Unmarshal(edge.Payload.Data, &msg); err 
!= nil {
-                               return "", properties{}, fmt.Errorf("bad pubsub 
payload: %v", err)
-                       }
-                       prop := properties{
-                               UserName:             buildName(edge.Scope(), 
fmt.Sprintf("%v", msg.Op)),
-                               Format:               "pubsub",
-                               PubSubTopic:          msg.GetTopic(),
-                               PubSubSubscription:   msg.GetSubscription(),
-                               PubSubIDLabel:        msg.GetIdAttribute(),
-                               PubSubTimestampLabel: 
msg.GetTimestampAttribute(),
-                               PubSubWithAttributes: msg.GetWithAttributes(),
-                       }
-                       if prop.PubSubSubscription != "" {
-                               prop.PubSubTopic = ""
-                       }
-
-                       switch msg.Op {
-                       case pubsub_v1.PubSubPayload_READ:
-                               return readKind, prop, nil
-
-                       case pubsub_v1.PubSubPayload_WRITE:
-                               c, err := encodeCoderRef(coder.NewBytes(), 
edge.Input[0].From.WindowingStrategy().Fn.Coder())
-                               if err != nil {
-                                       return "", properties{}, 
fmt.Errorf("bad window coder: %v", err)
-                               }
-                               prop.Encoding = c
-                               return writeKind, prop, nil
-
-                       default:
-                               return "", properties{}, fmt.Errorf("bad pubsub 
op: %v", msg.Op)
-                       }
-
-               default:
-                       return "", properties{}, fmt.Errorf("bad external urn: 
%v", edge.Payload.URN)
-               }
-
-       default:
-               return "", properties{}, fmt.Errorf("bad opcode: %v", edge)
-       }
-}
-
-func makeSerializedFnPayload(payload *v1.TransformPayload) string {
-       return protox.MustEncodeBase64(payload)
-}
-
-func encodeCoderRef(c *coder.Coder, wc *coder.WindowCoder) (*graphx.CoderRef, 
error) {
-       // TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
-       return graphx.EncodeCoderRef(coder.NewW(c, wc))
-}
-
-// buildName computes a Dataflow composite name understood by the Dataflow UI,
-// determined by the scope nesting. Dataflow simply uses "/" to separate
-// composite transforms, so we must remove them from the otherwise qualified
-// package names of DoFns, etc.
-func buildName(scope *graph.Scope, name string) string {
-       if scope.Parent == nil {
-               // Ignore "root" node in naming.
-               return path.Base(name)
-       }
-       return buildScopeName(scope) + "/" + path.Base(name)
-}
-
-func buildScopeName(scope *graph.Scope) string {
-       if scope.Parent.Parent == nil {
-               return scope.Label
-       }
-       return buildScopeName(scope.Parent) + "/" + scope.Label
-}
-
-// stepID converts a MultiEdge ID, i, to a Step ID, "s"+i. The name has no
-// semantic meaning.
-func stepID(id int) string {
-       return fmt.Sprintf("s%v", id)
-}
-
-func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message {
-       c := graphx.NewCoderMarshaller()
-       ws := graphx.MarshalWindowingStrategy(c, w)
-
-       msg := &rnapi_pb.MessageWithComponents{
-               Components: &rnapi_pb.Components{
-                       Coders: c.Build(),
-               },
-               Root: &rnapi_pb.MessageWithComponents_WindowingStrategy{
-                       WindowingStrategy: ws,
-               },
-       }
-       return msg
-}
-
-func encodeSerializedFn(in proto.Message) (string, error) {
-       // The Beam Runner API uses percent-encoding for serialized fn messages.
-       // See: https://en.wikipedia.org/wiki/Percent-encoding
-
-       data, err := proto.Marshal(in)
-       if err != nil {
-               return "", err
-       }
-       return url.PathEscape(string(data)), nil
-}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
index efb82ba40a7..1dae66e1442 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
@@ -27,6 +27,8 @@ import (
        "strings"
        "time"
 
+       "sync/atomic"
+
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -39,10 +41,13 @@ func IsWorkerCompatibleBinary() (string, bool) {
        return "", false
 }
 
+var unique int32
+
 // BuildTempWorkerBinary creates a local worker binary in the tmp directory
 // for linux/amd64. Caller responsible for deleting the binary.
 func BuildTempWorkerBinary(ctx context.Context) (string, error) {
-       filename := filepath.Join(os.TempDir(), fmt.Sprintf("beam-go-%v", 
time.Now().UnixNano()))
+       id := atomic.AddInt32(&unique, 1)
+       filename := filepath.Join(os.TempDir(), fmt.Sprintf("worker-%v-%v", id, 
time.Now().UnixNano()))
        if err := BuildWorkerBinary(ctx, filename); err != nil {
                return "", err
        }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index d1b5b3c346b..c9640b2d59b 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -46,6 +47,7 @@ type JobOptions struct {
 // Prepare prepares a job to the given job service. It returns the preparation 
id
 // artifact staging endpoint, and staging token if successful.
 func Prepare(ctx context.Context, client jobpb.JobServiceClient, p 
*pb.Pipeline, opt *JobOptions) (id, endpoint, stagingToken string, err error) {
+       hooks.SerializeHooksToOptions()
        raw := runtime.RawOptionsWrapper{
                Options:     beam.PipelineOptions.Export(),
                AppName:     opt.Name,
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 8b592d3bdf2..e3344692fa3 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -51,9 +51,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
 
        opt := &runnerlib.JobOptions{
-               Name:               jobopts.GetJobName(),
-               Experiments:        jobopts.GetExperiments(),
-               Worker:             *jobopts.WorkerBinary,
+               Name:        jobopts.GetJobName(),
+               Experiments: jobopts.GetExperiments(),
+               Worker:      *jobopts.WorkerBinary,
        }
        _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
        return err
diff --git a/sdks/go/pkg/beam/testing/passert/sum.go 
b/sdks/go/pkg/beam/testing/passert/sum.go
new file mode 100644
index 00000000000..338beff3002
--- /dev/null
+++ b/sdks/go/pkg/beam/testing/passert/sum.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package passert
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*sumFn)(nil)).Elem())
+}
+
+// Sum validates that the incoming PCollection<int> is a singleton
+// with the given value. Specialized version of Equals that avoids
+// a lot of machinery for testing.
+func Sum(s beam.Scope, col beam.PCollection, name string, size, value int) {
+       s = s.Scope(fmt.Sprintf("passert.Sum(%v)", name))
+
+       keyed := beam.AddFixedKey(s, col)
+       grouped := beam.GroupByKey(s, keyed)
+       beam.ParDo0(s, &sumFn{Name: name, Size: size, Sum: value}, grouped)
+}
+
+type sumFn struct {
+       Name string `json:"name,omitempty"`
+       Size int    `json:"size,omitempty"`
+       Sum  int    `json:"sum,omitempty"`
+}
+
+func (f *sumFn) ProcessElement(_ int, values func(*int) bool) error {
+       var sum, count, i int
+       for values(&i) {
+               count++
+               sum += i
+       }
+
+       if f.Sum != sum || f.Size != count {
+               return fmt.Errorf("passert.Sum(%v) = {%v, size: %v}, want {%v, 
size:%v}", f.Name, sum, count, f.Sum, f.Size)
+       }
+       return nil
+}
diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go 
b/sdks/go/pkg/beam/util/gcsx/gcs.go
index 666a837fb0f..dd9470fa7a4 100644
--- a/sdks/go/pkg/beam/util/gcsx/gcs.go
+++ b/sdks/go/pkg/beam/util/gcsx/gcs.go
@@ -25,6 +25,8 @@ import (
 
        "net/http"
 
+       "path"
+
        "golang.org/x/oauth2/google"
        "google.golang.org/api/googleapi"
        "google.golang.org/api/option"
@@ -137,3 +139,13 @@ func ParseObject(object string) (bucket, path string, err 
error) {
        // remove leading "/" in URL path
        return parsed.Host, parsed.Path[1:], nil
 }
+
+// Join joins a GCS path with an element. Preserves
+// the gs:// prefix.
+func Join(object string, elm string) string {
+       bucket, prefix, err := ParseObject(object)
+       if err != nil {
+               panic(err)
+       }
+       return MakeObject(bucket, path.Join(prefix, elm))
+}
diff --git a/sdks/go/test/integration/driver.go 
b/sdks/go/test/integration/driver.go
index 18460afee12..8daac8edb10 100644
--- a/sdks/go/test/integration/driver.go
+++ b/sdks/go/test/integration/driver.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/memfs"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/go/test/integration/primitives"
        "github.com/apache/beam/sdks/go/test/integration/wordcount"
 )
 
@@ -57,10 +58,18 @@ func main() {
        pipelines := []namedPipeline{
                {"wordcount:memfs", wordcount.WordCount(old_pond, 
"+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
                {"wordcount:kinglear", 
wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", 
"7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
+               {"pardo:multioutput", primitives.ParDoMultiOutput()},
+               // BEAM-3286: {"pardo:sideinput", primitives.ParDoSideInput()},
+               // BEAM-3286: {"pardo:kvsideinput", 
primitives.ParDoKVSideInput()},
+               {"cogbk:cogbk", primitives.CoGBK()},
+               {"flatten:flatten", primitives.Flatten()},
+               // {"flatten:dup", primitives.FlattenDup()},
        }
 
        re := regexp.MustCompile(*filter)
 
+       ctx := context.Background()
+
        ch := make(chan namedPipeline, len(pipelines))
        for _, np := range pipelines {
                if re.MatchString(np.name) {
@@ -69,8 +78,6 @@ func main() {
        }
        close(ch)
 
-       ctx := context.Background()
-
        var failures int32
        var wg sync.WaitGroup
        for i := 0; i < *parallel; i++ {
diff --git a/sdks/go/test/integration/primitives/cogbk.go 
b/sdks/go/test/integration/primitives/cogbk.go
new file mode 100644
index 00000000000..3564b074297
--- /dev/null
+++ b/sdks/go/test/integration/primitives/cogbk.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "fmt"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+)
+
+func genA(_ []byte, emit func(string, int)) {
+       emit("a", 1)
+       emit("a", 2)
+       emit("a", 3)
+       emit("b", 4)
+       emit("b", 5)
+       emit("c", 6)
+}
+
+func genB(_ []byte, emit func(string, int)) {
+       emit("a", 7)
+       emit("b", 8)
+       emit("d", 9)
+}
+
+func sum(nums func(*int) bool) int {
+       var ret, i int
+       for nums(&i) {
+               ret += i
+       }
+       return ret
+}
+
+func joinFn(key string, as, bs func(*int) bool, emit func(string, int)) {
+       emit(key, sum(as)+sum(bs))
+}
+
+func splitFn(key string, v int, a, b, c, d func(int)) {
+       switch key {
+       case "a":
+               a(v)
+       case "b":
+               b(v)
+       case "c":
+               c(v)
+       case "d":
+               d(v)
+       default:
+               panic(fmt.Sprintf("bad key: %v", key))
+       }
+}
+
+// CoGBK tests CoGBK.
+func CoGBK() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       as := beam.ParDo(s, genA, beam.Impulse(s))
+       bs := beam.ParDo(s, genB, beam.Impulse(s))
+
+       grouped := beam.CoGroupByKey(s, as, bs)
+       joined := beam.ParDo(s, joinFn, grouped)
+       a, b, c, d := beam.ParDo4(s, splitFn, joined)
+
+       passert.Sum(s, a, "a", 1, 13)
+       passert.Sum(s, b, "b", 1, 17)
+       passert.Sum(s, c, "c", 1, 6)
+       passert.Sum(s, d, "d", 1, 9)
+
+       return p
+}
diff --git a/sdks/go/test/integration/primitives/cogbk_test.go 
b/sdks/go/test/integration/primitives/cogbk_test.go
new file mode 100644
index 00000000000..2bebdc43b2f
--- /dev/null
+++ b/sdks/go/test/integration/primitives/cogbk_test.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func TestCoGBK(t *testing.T) {
+       if err := ptest.Run(CoGBK()); err != nil {
+               t.Error(err)
+       }
+}
diff --git a/sdks/go/test/integration/primitives/flatten.go 
b/sdks/go/test/integration/primitives/flatten.go
new file mode 100644
index 00000000000..34d40851c4d
--- /dev/null
+++ b/sdks/go/test/integration/primitives/flatten.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+)
+
+// Flatten tests flatten.
+func Flatten() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       a := beam.Create(s, 1, 2, 3)
+       b := beam.Create(s, 4, 5, 6)
+       c := beam.Create(s, 7, 8, 9)
+
+       flat := beam.Flatten(s, a, b, c)
+       passert.Sum(s, flat, "flat", 9, 45)
+
+       return p
+}
+
+// FlattenDups tests flatten with the same input multiple times.
+func FlattenDup() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       a := beam.Create(s, 1, 2, 3)
+       b := beam.Create(s, 4, 5, 6)
+
+       flat := beam.Flatten(s, a, b, a)
+       passert.Sum(s, flat, "flat", 9, 27)
+
+       return p
+}
diff --git a/sdks/go/test/integration/primitives/flatten_test.go 
b/sdks/go/test/integration/primitives/flatten_test.go
new file mode 100644
index 00000000000..04710e48d46
--- /dev/null
+++ b/sdks/go/test/integration/primitives/flatten_test.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func TestFlatten(t *testing.T) {
+       if err := ptest.Run(Flatten()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestFlattenDup(t *testing.T) {
+       if err := ptest.Run(FlattenDup()); err != nil {
+               t.Error(err)
+       }
+}
diff --git a/sdks/go/test/integration/primitives/pardo.go 
b/sdks/go/test/integration/primitives/pardo.go
new file mode 100644
index 00000000000..ae7d9b25d77
--- /dev/null
+++ b/sdks/go/test/integration/primitives/pardo.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+)
+
+func emit3Fn(elm int, emit, emit2, emit3 func(int)) {
+       emit(elm + 1)
+       emit2(elm + 2)
+       emit3(elm + 3)
+}
+
+// ParDoMultiOutput test a DoFn with multiple output.
+func ParDoMultiOutput() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       in := beam.Create(s, 1)
+       emit1, emit2, emit3 := beam.ParDo3(s, emit3Fn, in)
+       passert.Sum(s, emit1, "emit1", 1, 2)
+       passert.Sum(s, emit2, "emit2", 1, 3)
+       passert.Sum(s, emit3, "emit3", 1, 4)
+
+       return p
+}
+
+func sumValuesFn(_ []byte, values func(*int) bool) int {
+       sum := 0
+       var i int
+       for values(&i) {
+               sum += i
+       }
+       return sum
+}
+
+// ParDoSideInput computes the sum of ints using a side input.
+func ParDoSideInput() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+       out := beam.ParDo(s, sumValuesFn, beam.Impulse(s), 
beam.SideInput{Input: in})
+       passert.Sum(s, out, "out", 1, 45)
+
+       return p
+}
+
+func sumKVValuesFn(_ []byte, values func(*int, *int) bool) int {
+       sum := 0
+       var i, k int
+       for values(&i, &k) {
+               sum += i
+               sum += k
+       }
+       return sum
+}
+
+// ParDoKVSideInput computes the sum of ints using a KV side input.
+func ParDoKVSideInput() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+       kv := beam.AddFixedKey(s, in) // i -> (0,i)
+       out := beam.ParDo(s, sumKVValuesFn, beam.Impulse(s), 
beam.SideInput{Input: kv})
+       passert.Sum(s, out, "out", 1, 45)
+
+       return p
+}
diff --git a/sdks/go/test/integration/primitives/pardo_test.go 
b/sdks/go/test/integration/primitives/pardo_test.go
new file mode 100644
index 00000000000..f837689868d
--- /dev/null
+++ b/sdks/go/test/integration/primitives/pardo_test.go
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func TestParDoMultiOutput(t *testing.T) {
+       if err := ptest.Run(ParDoMultiOutput()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestParDoSideInput(t *testing.T) {
+       if err := ptest.Run(ParDoSideInput()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestParDoKVSideInput(t *testing.T) {
+       if err := ptest.Run(ParDoKVSideInput()); err != nil {
+               t.Error(err)
+       }
+}
diff --git a/sdks/go/test/regression/pardo.go b/sdks/go/test/regression/pardo.go
index 5ec1ef22991..e158e3b6d14 100644
--- a/sdks/go/test/regression/pardo.go
+++ b/sdks/go/test/regression/pardo.go
@@ -22,23 +22,73 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
 )
 
-func directCountFn(_ int, values func(*int) bool) (int, error) {
-       sum := 0
-       var i int
-       for values(&i) {
-               sum += i
-       }
-       return sum, nil
+func directFn(elm int) int {
+       return elm + 1
 }
 
-func emitCountFn(_ int, values func(*int) bool, emit func(int)) error {
+// DirectParDo tests direct form output DoFns.
+func DirectParDo() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       direct := beam.ParDo(s, directFn, beam.Create(s, 1, 2, 3))
+       passert.Sum(s, direct, "direct", 3, 9)
+
+       return p
+}
+
+func emitFn(elm int, emit func(int)) {
+       emit(elm + 1)
+}
+
+// EmitParDo tests emit form output DoFns.
+func EmitParDo() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       emit := beam.ParDo(s, emitFn, beam.Create(s, 1, 2, 3))
+       passert.Sum(s, emit, "emit", 3, 9)
+
+       return p
+}
+
+func emit2Fn(elm int, emit, emit2 func(int)) {
+       emit(elm + 1)
+       emit2(elm + 2)
+}
+
+// MultiEmitParDo tests double emit form output DoFns.
+func MultiEmitParDo() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       emit1, emit2 := beam.ParDo2(s, emit2Fn, beam.Create(s, 1, 2, 3))
+       passert.Sum(s, emit1, "emit2_1", 3, 9)
+       passert.Sum(s, emit2, "emit2_2", 3, 12)
+
+       return p
+}
+
+func mixedFn(elm int, emit func(int)) int {
+       emit(elm + 2)
+       return elm + 1
+}
+
+// MixedOutputParDo tests mixed direct + emit form output DoFns.
+func MixedOutputParDo() *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       mixed1, mixed2 := beam.ParDo2(s, mixedFn, beam.Create(s, 1, 2, 3))
+       passert.Sum(s, mixed1, "mixed_1", 3, 9)
+       passert.Sum(s, mixed2, "mixed_2", 3, 12)
+
+       return p
+}
+
+func directCountFn(_ int, values func(*int) bool) (int, error) {
        sum := 0
        var i int
        for values(&i) {
                sum += i
        }
-       emit(sum)
-       return nil
+       return sum, nil
 }
 
 // DirectParDoAfterGBK generates a pipeline with a direct-form
@@ -53,6 +103,16 @@ func DirectParDoAfterGBK() *beam.Pipeline {
        return p
 }
 
+func emitCountFn(_ int, values func(*int) bool, emit func(int)) error {
+       sum := 0
+       var i int
+       for values(&i) {
+               sum += i
+       }
+       emit(sum)
+       return nil
+}
+
 // EmitParDoAfterGBK generates a pipeline with a emit-form
 // ParDo after a GBK. See: BEAM-3978 and BEAM-4175.
 func EmitParDoAfterGBK() *beam.Pipeline {
diff --git a/sdks/go/test/regression/pardo_test.go 
b/sdks/go/test/regression/pardo_test.go
index 423eb6e77ed..322dd69c6f7 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/pardo_test.go
@@ -21,6 +21,30 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
 )
 
+func TestDirectParDo(t *testing.T) {
+       if err := ptest.Run(DirectParDo()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestEmitParDo(t *testing.T) {
+       if err := ptest.Run(EmitParDo()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestMultiEmitParDo(t *testing.T) {
+       if err := ptest.Run(MultiEmitParDo()); err != nil {
+               t.Error(err)
+       }
+}
+
+func TestMixedOutputParDo(t *testing.T) {
+       if err := ptest.Run(MixedOutputParDo()); err != nil {
+               t.Error(err)
+       }
+}
+
 func TestDirectParDoAfterGBK(t *testing.T) {
        if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
                t.Error(err)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 128811)
    Time Spent: 4.5h  (was: 4h 20m)

> Make Go Dataflow translation use protos directly
> ------------------------------------------------
>
>                 Key: BEAM-4813
>                 URL: https://issues.apache.org/jira/browse/BEAM-4813
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The Go SDK maintains 2 pipeline translations and keeps various tweaks in 
> sync. It would be better to remove the Dataflow one and extract a more 
> flexible (such as running as a separate proxy) translation from proto to 
> v1beta3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to