[ 
https://issues.apache.org/jira/browse/BEAM-3287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288465#comment-16288465
 ] 

ASF GitHub Bot commented on BEAM-3287:
--------------------------------------

kennknowles closed pull request #4213: [BEAM-3287] Use model pipelines in Go SDK
URL: https://github.com/apache/beam/pull/4213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index b940264fac5..e87286f23c4 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -163,11 +163,22 @@ type MultiEdge struct {
        Output []*Outbound
 }
 
-// ID returns the graph-local identifier for the scope.
+// ID returns the graph-local identifier for the edge.
 func (e *MultiEdge) ID() int {
        return e.id
 }
 
+// Name returns a not-necessarily-unique name for the edge.
+func (e *MultiEdge) Name() string {
+       if e.DoFn != nil {
+               return e.DoFn.Name()
+       }
+       if e.CombineFn != nil {
+               return e.CombineFn.Name()
+       }
+       return string(e.Op)
+}
+
 // Scope return the scope.
 func (e *MultiEdge) Scope() *Scope {
        return e.parent
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index 96c63140464..86698141306 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -13,35 +13,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package graphx
+package graphx_test
 
 import (
-       "fmt"
        "reflect"
        "testing"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
 )
 
-type symlookup bool
-
-func (s symlookup) Sym2Addr(name string) (uintptr, error) {
-       switch name {
-       case reflectx.FunctionName(dec):
-               return reflect.ValueOf(dec).Pointer(), nil
-       case reflectx.FunctionName(enc):
-               return reflect.ValueOf(enc).Pointer(), nil
-       default:
-               panic(fmt.Sprintf("bad name: %v", name))
-       }
-}
-
 func init() {
-       runtime.SymbolResolver = symlookup(false)
+       ptest.RegisterFn(dec)
+       ptest.RegisterFn(enc)
 }
 
 // TestMarshalUnmarshalCoders verifies that coders survive a proto roundtrip.
@@ -86,7 +74,7 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
 
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
-                       coders, err := 
UnmarshalCoders(MarshalCoders([]*coder.Coder{test.c}))
+                       coders, err := 
graphx.UnmarshalCoders(graphx.MarshalCoders([]*coder.Coder{test.c}))
                        if err != nil {
                                t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", 
test.c, err)
                        }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
new file mode 100644
index 00000000000..f8cfc69227a
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+       "fmt"
+
+       "strconv"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/golang/protobuf/proto"
+)
+
+const (
+       // Model constants
+
+       urnImpulse = "urn:beam:transform:impulse:v1"
+       urnParDo   = "urn:beam:transform:pardo:v1"
+       urnFlatten = "urn:beam:transform:flatten:v1"
+       urnGBK     = "urn:beam:transform:groupbykey:v1"
+       urnCombine = "urn:beam:transform:combine:v1"
+       urnWindow  = "urn:beam:transform:window:v1"
+
+       urnDataSource = "urn:org.apache.beam:source:runner:0.1"
+       urnDataSink   = "urn:org.apache.beam:sink:runner:0.1"
+
+       urnGlobalWindowsWindowFn = "beam:windowfn:global_windows:v0.1"
+
+       // SDK constants
+
+       // TODO: use "urn:beam:go:transform:dofn:v1" when the Dataflow runner
+       // uses the model pipeline and no longer falls back to Java.
+       urnDoFn = "urn:beam:dofn:javasdk:0.1"
+)
+
+// TODO(herohde) 11/6/2017: move some of the configuration into the graph 
during construction.
+
+// Options for marshalling a graph into a model pipeline.
+type Options struct {
+       // ContainerImageURL is the default environment container image.
+       ContainerImageURL string
+}
+
+// Marshal converts a graph to a model pipeline.
+func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) {
+       tree := NewScopeTree(edges)
+       EnsureUniqueNames(tree)
+
+       m := newMarshaller(opt.ContainerImageURL)
+
+       var roots []string
+       for _, edge := range tree.Edges {
+               roots = append(roots, m.addMultiEdge(edge))
+       }
+       for _, t := range tree.Children {
+               roots = append(roots, m.addScopeTree(t))
+       }
+
+       p := &pb.Pipeline{
+               Components:       m.build(),
+               RootTransformIds: roots,
+       }
+       return p, nil
+}
+
+type marshaller struct {
+       imageURL string
+
+       transforms   map[string]*pb.PTransform
+       pcollections map[string]*pb.PCollection
+       windowing    map[string]*pb.WindowingStrategy
+       environments map[string]*pb.Environment
+
+       coders *CoderMarshaller
+}
+
+func newMarshaller(imageURL string) *marshaller {
+       return &marshaller{
+               imageURL:     imageURL,
+               transforms:   make(map[string]*pb.PTransform),
+               pcollections: make(map[string]*pb.PCollection),
+               windowing:    make(map[string]*pb.WindowingStrategy),
+               environments: make(map[string]*pb.Environment),
+               coders:       NewCoderMarshaller(),
+       }
+}
+
+func (m *marshaller) build() *pb.Components {
+       return &pb.Components{
+               Transforms:          m.transforms,
+               Pcollections:        m.pcollections,
+               WindowingStrategies: m.windowing,
+               Environments:        m.environments,
+               Coders:              m.coders.Build(),
+       }
+}
+
+func (m *marshaller) addScopeTree(s *ScopeTree) string {
+       id := scopeID(s.Scope.Scope)
+       if _, exists := m.transforms[id]; exists {
+               return id
+       }
+
+       var subtransforms []string
+       for _, edge := range s.Edges {
+               subtransforms = append(subtransforms, m.addMultiEdge(edge))
+       }
+       for _, tree := range s.Children {
+               subtransforms = append(subtransforms, m.addScopeTree(tree))
+       }
+
+       // Compute the input/output for this scope:
+       //    inputs  := U(subinputs)\U(suboutputs)
+       //    outputs := U(suboutputs)\U(subinputs)
+       // where U is set union and \ is set subtraction.
+
+       in := make(map[string]bool)
+       out := make(map[string]bool)
+       for _, sid := range subtransforms {
+               inout(m.transforms[sid], in, out)
+       }
+
+       transform := &pb.PTransform{
+               UniqueName:    s.Scope.Name,
+               Subtransforms: subtransforms,
+               Inputs:        diff(in, out),
+               Outputs:       diff(out, in),
+       }
+       m.transforms[id] = transform
+       return id
+}
+
+// diff computes A\B and returns its keys as an identity map.
+func diff(a, b map[string]bool) map[string]string {
+       ret := make(map[string]string)
+       for key, _ := range a {
+               if !b[key] {
+                       ret[key] = key
+               }
+       }
+       return ret
+}
+
+// inout adds the input and output pcollection ids to the accumulators.
+func inout(transform *pb.PTransform, in, out map[string]bool) {
+       for _, col := range transform.GetInputs() {
+               in[col] = true
+       }
+       for _, col := range transform.GetOutputs() {
+               out[col] = true
+       }
+}
+
+func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+       id := edgeID(edge.Edge)
+       if _, exists := m.transforms[id]; exists {
+               return id
+       }
+
+       inputs := make(map[string]string)
+       for i, in := range edge.Edge.Input {
+               m.addNode(in.From)
+               inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
+       }
+       outputs := make(map[string]string)
+       for i, out := range edge.Edge.Output {
+               m.addNode(out.To)
+               outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
+       }
+
+       transform := &pb.PTransform{
+               UniqueName: edge.Name,
+               Spec:       m.makePayload(edge.Edge),
+               Inputs:     inputs,
+               Outputs:    outputs,
+       }
+       // TODO(herohde) 12/1/2017: set target for DataSource/DataSink
+
+       m.transforms[id] = transform
+       return id
+}
+
+func (m *marshaller) makePayload(edge *graph.MultiEdge) *pb.FunctionSpec {
+       switch edge.Op {
+       case graph.Impulse:
+               return &pb.FunctionSpec{Urn: urnImpulse}
+
+       case graph.DataSource:
+               payload := &fnpb.RemoteGrpcPort{
+                       ApiServiceDescriptor: &pb.ApiServiceDescriptor{
+                               Url: edge.Port.URL,
+                       },
+               }
+               return &pb.FunctionSpec{Urn: urnDataSource, Payload: 
protox.MustEncode(payload)}
+
+       case graph.DataSink:
+               payload := &fnpb.RemoteGrpcPort{
+                       ApiServiceDescriptor: &pb.ApiServiceDescriptor{
+                               Url: edge.Port.URL,
+                       },
+               }
+               return &pb.FunctionSpec{Urn: urnDataSink, Payload: 
protox.MustEncode(payload)}
+
+       case graph.ParDo, graph.Combine:
+               payload := &pb.ParDoPayload{
+                       DoFn: &pb.SdkFunctionSpec{
+                               Spec: &pb.FunctionSpec{
+                                       Urn:     urnDoFn,
+                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge)),
+                               },
+                               EnvironmentId: m.addDefaultEnv(),
+                       },
+               }
+               return &pb.FunctionSpec{Urn: urnParDo, Payload: 
protox.MustEncode(payload)}
+
+       case graph.Flatten:
+               return &pb.FunctionSpec{Urn: urnFlatten}
+
+       case graph.GBK:
+               return &pb.FunctionSpec{Urn: urnGBK}
+
+       default:
+               panic(fmt.Sprintf("Unexpected opcode: %v", edge.Op))
+       }
+}
+
+func (m *marshaller) addNode(n *graph.Node) string {
+       id := nodeID(n)
+       if _, exists := m.pcollections[id]; exists {
+               return id
+       }
+
+       // TODO(herohde) 11/15/2017: expose UniqueName to user. Handle 
unbounded and windowing.
+
+       col := &pb.PCollection{
+               UniqueName:          id,
+               CoderId:             m.coders.Add(n.Coder),
+               IsBounded:           pb.IsBounded_BOUNDED,
+               WindowingStrategyId: 
m.addWindowingStrategy(window.NewGlobalWindow()),
+       }
+       m.pcollections[id] = col
+       return id
+}
+
+func (m *marshaller) addDefaultEnv() string {
+       const id = "go"
+       if _, exists := m.environments[id]; !exists {
+               m.environments[id] = &pb.Environment{Url: m.imageURL}
+       }
+       return id
+}
+
+func (m *marshaller) addWindowingStrategy(w *window.Window) string {
+       if w.Kind() != window.GlobalWindow {
+               panic(fmt.Sprintf("Unsupported window type supplied: %v", w))
+       }
+
+       const id = "global"
+       if _, exists := m.windowing[id]; !exists {
+               wcid := m.coders.AddWindow(w)
+
+               ws := &pb.WindowingStrategy{
+                       WindowFn: &pb.SdkFunctionSpec{
+                               Spec: &pb.FunctionSpec{
+                                       Urn: urnGlobalWindowsWindowFn,
+                               },
+                       },
+                       MergeStatus:      pb.MergeStatus_NON_MERGING,
+                       AccumulationMode: pb.AccumulationMode_DISCARDING,
+                       WindowCoderId:    wcid,
+                       Trigger: &pb.Trigger{
+                               Trigger: &pb.Trigger_Default_{
+                                       Default: &pb.Trigger_Default{},
+                               },
+                       },
+                       OutputTime:      pb.OutputTime_END_OF_WINDOW,
+                       ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+                       AllowedLateness: 0,
+               }
+               m.windowing[id] = ws
+       }
+       return id
+}
+
+func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
+       ref, err := EncodeMultiEdge(edge)
+       if err != nil {
+               panic(fmt.Sprintf("Failed to serialize %v: %v", edge, err))
+       }
+       return protox.MustEncodeBase64(ref)
+}
+
+// Unmarshal converts a model pipeline into a graph.
+func Unmarshal(p *pb.Pipeline) (*graph.Graph, error) {
+       u, s := newUnmarshaller(p.GetComponents())
+       for _, id := range p.GetRootTransformIds() {
+               if err := u.unmarshalTransform(s, id); err != nil {
+                       return nil, err
+               }
+       }
+       return u.build(), nil
+}
+
+type unmarshaller struct {
+       g *graph.Graph
+
+       nodes     map[string]*graph.Node
+       windowing map[string]*window.Window
+       coders    *CoderUnmarshaller
+
+       comp *pb.Components
+}
+
+func newUnmarshaller(comp *pb.Components) (*unmarshaller, *graph.Scope) {
+       u := &unmarshaller{
+               g:         graph.New(),
+               nodes:     make(map[string]*graph.Node),
+               windowing: make(map[string]*window.Window),
+               coders:    NewCoderUnmarshaller(comp.GetCoders()),
+               comp:      comp,
+       }
+       return u, u.g.Root()
+}
+
+func (u *unmarshaller) build() *graph.Graph {
+       return u.g
+}
+
+func (u *unmarshaller) unmarshalWindow(id string) (*window.Window, error) {
+       if w, exists := u.windowing[id]; exists {
+               return w, nil
+       }
+
+       ws, ok := u.comp.GetWindowingStrategies()[id]
+       if !ok {
+               return nil, fmt.Errorf("windowing strategy %v not found", id)
+       }
+       if urn := ws.GetWindowFn().GetSpec().GetUrn(); urn != 
urnGlobalWindowsWindowFn {
+               return nil, fmt.Errorf("unsupported window type: %v", urn)
+       }
+
+       w := window.NewGlobalWindow()
+       u.windowing[id] = w
+       return w, nil
+}
+
+func (u *unmarshaller) unmarshalNode(id string) (*graph.Node, error) {
+       if n, exists := u.nodes[id]; exists {
+               return n, nil
+       }
+
+       col, ok := u.comp.GetPcollections()[id]
+       if !ok {
+               return nil, fmt.Errorf("windowing strategy %v not found", id)
+       }
+       c, err := u.coders.Coder(col.CoderId)
+       if err != nil {
+               return nil, err
+       }
+
+       w := window.NewGlobalWindow()
+       if col.WindowingStrategyId != "" {
+               // TODO(herohde) 12/4/2017: seems to be optional or just not 
present through legacy Dataflow.
+
+               w, err = u.unmarshalWindow(col.WindowingStrategyId)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       n := u.g.NewNode(c.T, w)
+       n.Coder = c
+
+       u.nodes[id] = n
+       return n, nil
+}
+
+func (u *unmarshaller) unmarshalTransform(scope *graph.Scope, id string) error 
{
+       transform, ok := u.comp.GetTransforms()[id]
+       if !ok {
+               return fmt.Errorf("transform %v not found", id)
+       }
+
+       if len(transform.GetSubtransforms()) > 0 {
+               // Composite transform. Ignore in/out.
+
+               scope = u.g.NewScope(scope, transform.GetUniqueName())
+               for _, cid := range transform.GetSubtransforms() {
+                       if err := u.unmarshalTransform(scope, cid); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+
+       urn := transform.GetSpec().GetUrn()
+       payload := transform.GetSpec().GetPayload()
+
+       edge := u.g.NewEdge(scope)
+
+       switch urn {
+       case urnImpulse:
+               edge.Op = graph.Impulse
+               edge.Output = makeEmptyOutbound(1)
+
+       case urnDataSource:
+               port, err := unmarshalPort(payload)
+               if err != nil {
+                       return err
+               }
+               edge.Op = graph.DataSource
+               edge.Port = port
+               edge.Output = makeEmptyOutbound(1)
+
+               for key := range transform.GetOutputs() {
+                       edge.Target = &graph.Target{ID: id, Name: key}
+               }
+
+       case urnDataSink:
+               port, err := unmarshalPort(payload)
+               if err != nil {
+                       return err
+               }
+
+               edge.Op = graph.DataSink
+               edge.Port = port
+               edge.Input = makeEmptyInbound(1)
+
+               for key := range transform.GetInputs() {
+                       edge.Target = &graph.Target{ID: id, Name: key}
+               }
+
+       case urnParDo:
+               var pardo pb.ParDoPayload
+               if err := proto.Unmarshal(payload, &pardo); err != nil {
+                       return err
+               }
+               var me v1.MultiEdge
+               if err := 
protox.DecodeBase64(string(pardo.GetDoFn().GetSpec().GetPayload()), &me); err 
!= nil {
+                       return err
+               }
+               op, fn, in, out, err := DecodeMultiEdge(&me)
+               if err != nil {
+                       return err
+               }
+
+               edge.Op = op
+               edge.Input = in
+               edge.Output = out
+
+               switch edge.Op {
+               case graph.ParDo:
+                       edge.DoFn, err = graph.AsDoFn(fn)
+                       if err != nil {
+                               return err
+                       }
+               case graph.Combine:
+                       edge.CombineFn, err = graph.AsCombineFn(fn)
+                       if err != nil {
+                               return err
+                       }
+               default:
+                       panic(fmt.Sprintf("Opcode should be one of ParDo or 
Combine, but it is: %v", edge.Op))
+               }
+
+       case urnDoFn:
+               // TODO(herohde) 12/4/2017: we see DoFns directly with 
Dataflow. Handle that
+               // case here, for now, so that the harness can use this logic.
+
+               var me v1.MultiEdge
+               if err := protox.DecodeBase64(string(payload), &me); err != nil 
{
+                       return err
+               }
+               op, fn, in, out, err := DecodeMultiEdge(&me)
+               if err != nil {
+                       return err
+               }
+
+               edge.Op = op
+               edge.Input = in
+               edge.Output = out
+
+               switch edge.Op {
+               case graph.ParDo:
+                       edge.DoFn, err = graph.AsDoFn(fn)
+                       if err != nil {
+                               return err
+                       }
+               case graph.Combine:
+                       edge.CombineFn, err = graph.AsCombineFn(fn)
+                       if err != nil {
+                               return err
+                       }
+               default:
+                       panic(fmt.Sprintf("Opcode should be one of ParDo or 
Combine, but it is: %v", edge.Op))
+               }
+
+       case urnFlatten:
+               edge.Op = graph.Flatten
+               edge.Input = makeEmptyInbound(len(transform.GetInputs()))
+               edge.Output = makeEmptyOutbound(1)
+
+       case urnGBK:
+               edge.Op = graph.GBK
+               edge.Input = makeEmptyInbound(1)
+               edge.Output = makeEmptyOutbound(1)
+
+       default:
+               panic(fmt.Sprintf("Unexpected transform URN: %v", urn))
+       }
+
+       if err := u.linkInbound(transform.GetInputs(), edge.Input); err != nil {
+               return err
+       }
+       return u.linkOutbound(transform.GetOutputs(), edge.Output)
+}
+
+func (u *unmarshaller) linkInbound(m map[string]string, in []*graph.Inbound) 
error {
+       nodes, err := u.unmarshalKeyedNodes(m)
+       if err != nil {
+               return err
+       }
+       if len(nodes) != len(in) {
+               return fmt.Errorf("unexpected number of inputs: %v, want %v", 
len(nodes), len(in))
+       }
+       for i := 0; i < len(in); i++ {
+               in[i].From = nodes[i]
+               if in[i].Type == nil {
+                       in[i].Type = nodes[i].Type()
+               }
+       }
+       return nil
+}
+
+func (u *unmarshaller) linkOutbound(m map[string]string, out 
[]*graph.Outbound) error {
+       nodes, err := u.unmarshalKeyedNodes(m)
+       if err != nil {
+               return err
+       }
+       if len(nodes) != len(out) {
+               return fmt.Errorf("unexpected number of outputs: %v, want %v", 
len(nodes), len(out))
+       }
+       for i := 0; i < len(out); i++ {
+               out[i].To = nodes[i]
+               if out[i].Type == nil {
+                       out[i].Type = nodes[i].Type()
+               }
+       }
+       return nil
+}
+
+func (u *unmarshaller) unmarshalKeyedNodes(m map[string]string) 
([]*graph.Node, error) {
+       if len(m) == 0 {
+               return nil, nil
+       }
+
+       // (1) Compute index. If generate by the marshaller above, we have
+       // a "iN" name that directly indicates the position.
+
+       index := make(map[string]int)
+       nodes := make(map[string]*graph.Node)
+       complete := true
+
+       for key, value := range m {
+               if i, err := strconv.Atoi(strings.TrimPrefix(key, "i")); 
!strings.HasPrefix(key, "i") || err != nil {
+                       complete = false
+                       if key == "bogus" {
+                               continue // Ignore special bogus node for 
legacy Dataflow.
+                       }
+               } else {
+                       index[key] = i
+               }
+               n, err := u.unmarshalNode(value)
+               if err != nil {
+                       return nil, err
+               }
+               nodes[key] = n
+       }
+
+       // (2) Impose order, if present, on nodes.
+
+       if !complete {
+               // Inserted node or fallback. Assume any order is ok.
+               var ret []*graph.Node
+               for _, n := range nodes {
+                       ret = append(ret, n)
+               }
+               return ret, nil
+       }
+
+       ret := make([]*graph.Node, len(m), len(m))
+       for key, n := range nodes {
+               ret[index[key]] = n
+       }
+       return ret, nil
+
+}
+
+func unmarshalPort(data []byte) (*graph.Port, error) {
+       var port fnpb.RemoteGrpcPort
+       if err := proto.Unmarshal(data, &port); err != nil {
+               return nil, err
+       }
+       return &graph.Port{
+               URL: port.GetApiServiceDescriptor().GetUrl(),
+       }, nil
+}
+
+func makeEmptyInbound(n int) []*graph.Inbound {
+       var ret []*graph.Inbound
+       for i := 0; i < n; i++ {
+               ret = append(ret, &graph.Inbound{Kind: graph.Main})
+       }
+       return ret
+}
+
+func makeEmptyOutbound(n int) []*graph.Outbound {
+       var ret []*graph.Outbound
+       for i := 0; i < n; i++ {
+               ret = append(ret, &graph.Outbound{})
+       }
+       return ret
+}
+
+func nodeID(n *graph.Node) string {
+       return fmt.Sprintf("n%v", n.ID())
+}
+
+func scopeID(s *graph.Scope) string {
+       return fmt.Sprintf("s%v", s.ID())
+}
+
+func edgeID(e *graph.MultiEdge) string {
+       return fmt.Sprintf("e%v", e.ID())
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
new file mode 100644
index 00000000000..4fb1505f927
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx_test
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func init() {
+       ptest.RegisterFn(pickFn)
+}
+
+func pickFn(a int, small, big func(int)) {
+       if a < 3 {
+               small(a)
+       } else {
+               big(a)
+       }
+}
+
+func pick(t *testing.T, g *graph.Graph) *graph.MultiEdge {
+       dofn, err := graph.NewDoFn(pickFn)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       in := g.NewNode(intT(), window.NewGlobalWindow())
+       in.Coder = intCoder()
+
+       e, err := graph.NewParDo(g, g.Root(), dofn, []*graph.Node{in}, nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+       e.Output[0].To.Coder = intCoder()
+       e.Output[1].To.Coder = intCoder()
+       return e
+}
+
+func intT() typex.FullType {
+       return typex.NewW(typex.New(reflectx.Int))
+}
+
+func intCoder() *coder.Coder {
+       return coder.NewW(custom("int", reflectx.Int), window.NewGlobalWindow())
+}
+
+// TestParDoRoundtrip verifies that ParDo can be serialized and deserialized.
+func TestParDoRoundtrip(t *testing.T) {
+       g := graph.New()
+       pick(t, g)
+
+       edges, _, err := g.Build()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if len(edges) != 1 {
+               t.Fatal("expected a single edge")
+       }
+
+       edges2 := roundtrip(t, edges)
+
+       if !areEdgesSimilar(edges[0], edges2[0]) {
+               t.Errorf("bad ParDo translation: %v, want %v", edges2[0], 
edges[0])
+       }
+}
+
+func roundtrip(t *testing.T, edges []*graph.MultiEdge) []*graph.MultiEdge {
+       p, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: 
"foo"})
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       // t.Logf("P: %v", proto.MarshalTextString(p))
+
+       g2, err := graphx.Unmarshal(p)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       edges2, _, err := g2.Build()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       if len(edges2) != len(edges) {
+               t.Fatalf("Graph sizes = %v, want %v", len(edges2), len(edges))
+       }
+       return edges2
+}
+
+// areEdgesSimilar compares two edges from different graphs. It returns true 
iff
+// they have identical opcode, function and shape.
+func areEdgesSimilar(a, b *graph.MultiEdge) bool {
+       if a.Op != b.Op {
+               return false
+       }
+       if a.Name() != b.Name() {
+               return false
+       }
+
+       if len(a.Input) != len(b.Input) {
+               return false
+       }
+       for i, in := range a.Input {
+               in2 := b.Input[i]
+
+               if in.Kind != in2.Kind {
+                       return false
+               }
+               if !in.From.Coder.Equals(in2.From.Coder) {
+                       return false
+               }
+       }
+
+       if len(a.Output) != len(b.Output) {
+               return false
+       }
+       for i, out := range a.Output {
+               out2 := b.Output[i]
+
+               if !out.To.Coder.Equals(out2.To.Coder) {
+                       return false
+               }
+       }
+
+       if a.Op == graph.DataSink || a.Op == graph.DataSource {
+               if a.Port.URL != b.Port.URL {
+                       return false
+               }
+       }
+
+       return true
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/tree.go 
b/sdks/go/pkg/beam/core/runtime/graphx/tree.go
new file mode 100644
index 00000000000..3e3b925dc25
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/tree.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+       "fmt"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+)
+
+// NamedEdge is a named MultiEdge.
+type NamedEdge struct {
+       Name string
+       Edge *graph.MultiEdge
+}
+
+// NamedScope is a named Scope.
+type NamedScope struct {
+       Name  string
+       Scope *graph.Scope
+}
+
+// ScopeTree is a convenient representation of the Scope-structure as a tree.
+// Each ScopeTree may also be a subtree of another ScopeTree. The tree 
structure
+// respects the underlying Scope structure, i.e., if Scope 'a' has a parent
+// 'b' then the ScopeTree for 'b' must have the ScopeTree for 'a' as a child.
+type ScopeTree struct {
+       // Scope is the named scope at the root of the (sub)tree.
+       Scope NamedScope
+       // Edges are the named edges directly under this scope.
+       Edges []NamedEdge
+
+       // Children are the scopes directly under this scope.
+       Children []*ScopeTree
+}
+
+// NewScopeTree computes the ScopeTree for a set of edges.
+func NewScopeTree(edges []*graph.MultiEdge) *ScopeTree {
+       t := newTreeBuilder()
+       for _, edge := range edges {
+               t.addEdge(edge)
+       }
+       return t.root
+}
+
+// treeBuilder is a builder of a ScopeTree from any set of edges and
+// scopes from the same graph.
+type treeBuilder struct {
+       root    *ScopeTree
+       id2tree map[int]*ScopeTree
+}
+
+func newTreeBuilder() *treeBuilder {
+       return &treeBuilder{
+               id2tree: make(map[int]*ScopeTree),
+       }
+}
+
+func (t *treeBuilder) addEdge(edge *graph.MultiEdge) {
+       tree := t.addScope(edge.Scope())
+       tree.Edges = append(tree.Edges, NamedEdge{Name: edge.Name(), Edge: 
edge})
+}
+
+func (t *treeBuilder) addScope(s *graph.Scope) *ScopeTree {
+       if tree, exists := t.id2tree[s.ID()]; exists {
+               return tree
+       }
+
+       tree := &ScopeTree{Scope: NamedScope{Name: s.Label, Scope: s}}
+       t.id2tree[s.ID()] = tree
+
+       if s.Parent == nil {
+               t.root = tree
+       } else {
+               parent := t.addScope(s.Parent)
+               parent.Children = append(parent.Children, tree)
+       }
+       return tree
+}
+
+// EnsureUniqueNames ensures that each name is unique within each ScopeTree
+// recursively. Any conflict is resolved by adding '1, '2, etc to the name.
+func EnsureUniqueNames(tree *ScopeTree) {
+       seen := make(map[string]bool)
+       for _, edge := range tree.Edges {
+               edge.Name = findFreeName(seen, edge.Name)
+               seen[edge.Name] = true
+       }
+
+       for _, s := range tree.Children {
+               EnsureUniqueNames(s)
+
+               s.Scope.Name = findFreeName(seen, s.Scope.Name)
+               seen[s.Scope.Name] = true
+       }
+}
+
+func findFreeName(seen map[string]bool, name string) string {
+       if !seen[name] {
+               return name
+       }
+       for i := 1; ; i++ {
+               next := fmt.Sprintf("%v'%v", name, i)
+               if !seen[next] {
+                       return next
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index fc03fcf987e..76d516fba02 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -27,8 +27,10 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
        "github.com/golang/protobuf/proto"
        "google.golang.org/grpc"
@@ -54,7 +56,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        }
        defer conn.Close()
 
-       client, err := pb.NewBeamFnControlClient(conn).Control(ctx)
+       client, err := fnpb.NewBeamFnControlClient(conn).Control(ctx)
        if err != nil {
                return fmt.Errorf("Failed to connect to control service: %v", 
err)
        }
@@ -64,7 +66,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        // Each ProcessBundle is a sub-graph of the original one.
 
        var wg sync.WaitGroup
-       respc := make(chan *pb.InstructionResponse, 100)
+       respc := make(chan *fnpb.InstructionResponse, 100)
 
        wg.Add(1)
        go func() {
@@ -94,7 +96,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                                recordFooter()
                                return nil
                        }
-                       return fmt.Errorf("Recv failed: %v", err)
+                       return fmt.Errorf("recv failed: %v", err)
                }
 
                log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req))
@@ -127,7 +129,7 @@ type control struct {
        // TODO: running pipelines
 }
 
-func (c *control) handleInstruction(ctx context.Context, req 
*pb.InstructionRequest) *pb.InstructionResponse {
+func (c *control) handleInstruction(ctx context.Context, req 
*fnpb.InstructionRequest) *fnpb.InstructionResponse {
        id := req.GetInstructionId()
        ctx = context.WithValue(ctx, instKey, id)
 
@@ -136,7 +138,22 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
                msg := req.GetRegister()
 
                for _, desc := range msg.GetProcessBundleDescriptor() {
-                       g, err := translate(desc)
+                       var roots []string
+                       for id, _ := range desc.GetTransforms() {
+                               roots = append(roots, id)
+                       }
+                       p := &pb.Pipeline{
+                               Components: &pb.Components{
+                                       Transforms:          desc.Transforms,
+                                       Pcollections:        desc.Pcollections,
+                                       Coders:              desc.Coders,
+                                       WindowingStrategies: 
desc.WindowingStrategies,
+                                       Environments:        desc.Environments,
+                               },
+                               RootTransformIds: roots,
+                       }
+
+                       g, err := graphx.Unmarshal(p)
                        if err != nil {
                                return fail(id, "Invalid bundle desc: %v", err)
                        }
@@ -144,10 +161,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
                        log.Debugf(ctx, "Added subgraph %v:\n %v", 
desc.GetId(), g)
                }
 
-               return &pb.InstructionResponse{
+               return &fnpb.InstructionResponse{
                        InstructionId: id,
-                       Response: &pb.InstructionResponse_Register{
-                               Register: &pb.RegisterResponse{},
+                       Response: &fnpb.InstructionResponse_Register{
+                               Register: &fnpb.RegisterResponse{},
                        },
                }
 
@@ -175,10 +192,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
                        return fail(id, "Execute failed: %v", err)
                }
 
-               return &pb.InstructionResponse{
+               return &fnpb.InstructionResponse{
                        InstructionId: id,
-                       Response: &pb.InstructionResponse_ProcessBundle{
-                               ProcessBundle: &pb.ProcessBundleResponse{},
+                       Response: &fnpb.InstructionResponse_ProcessBundle{
+                               ProcessBundle: &fnpb.ProcessBundleResponse{},
                        },
                }
 
@@ -187,10 +204,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
 
                log.Debugf(ctx, "PB Progress: %v", msg)
 
-               return &pb.InstructionResponse{
+               return &fnpb.InstructionResponse{
                        InstructionId: id,
-                       Response: &pb.InstructionResponse_ProcessBundleProgress{
-                               ProcessBundleProgress: 
&pb.ProcessBundleProgressResponse{},
+                       Response: 
&fnpb.InstructionResponse_ProcessBundleProgress{
+                               ProcessBundleProgress: 
&fnpb.ProcessBundleProgressResponse{},
                        },
                }
 
@@ -199,10 +216,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
 
                log.Debugf(ctx, "PB Split: %v", msg)
 
-               return &pb.InstructionResponse{
+               return &fnpb.InstructionResponse{
                        InstructionId: id,
-                       Response: &pb.InstructionResponse_ProcessBundleSplit{
-                               ProcessBundleSplit: 
&pb.ProcessBundleSplitResponse{},
+                       Response: &fnpb.InstructionResponse_ProcessBundleSplit{
+                               ProcessBundleSplit: 
&fnpb.ProcessBundleSplitResponse{},
                        },
                }
 
@@ -211,10 +228,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *pb.InstructionRequ
        }
 }
 
-func fail(id, format string, args ...interface{}) *pb.InstructionResponse {
-       dummy := &pb.InstructionResponse_Register{Register: 
&pb.RegisterResponse{}}
+func fail(id, format string, args ...interface{}) *fnpb.InstructionResponse {
+       dummy := &fnpb.InstructionResponse_Register{Register: 
&fnpb.RegisterResponse{}}
 
-       return &pb.InstructionResponse{
+       return &fnpb.InstructionResponse{
                InstructionId: id,
                Error:         fmt.Sprintf(format, args...),
                Response:      dummy,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go 
b/sdks/go/pkg/beam/core/runtime/harness/logging.go
index 31ae829f745..68e225e246d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/logging.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go
@@ -147,5 +147,5 @@ func (w *remoteWriter) connect(ctx context.Context) error {
 
                // fmt.Fprintf(os.Stderr, "SENT: %v\n", msg)
        }
-       return fmt.Errorf("Internal: buffer closed?")
+       return fmt.Errorf("internal: buffer closed?")
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/translate.go 
b/sdks/go/pkg/beam/core/runtime/harness/translate.go
deleted file mode 100644
index 9838ce94a7a..00000000000
--- a/sdks/go/pkg/beam/core/runtime/harness/translate.go
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package harness
-
-import (
-       "errors"
-       "fmt"
-
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
-       fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-       rnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
-)
-
-var (
-       errRootlessBundle = errors.New("invalid bundle: no roots supplied")
-       errBundleHasCycle = errors.New("bundle contained a cycle")
-)
-
-// Tracks provenance information of PCollections to help linking nodes
-// to their predecessors.
-type pCollInfo struct {
-       xid   string                // constructing transform ID
-       xform *rnapi_pb.PTransform  // constructing transform
-       pcoll *rnapi_pb.PCollection // collection metadata
-}
-
-// lookups on PCollections by their ID.
-type pCollMap map[string]*pCollInfo
-
-type nodeID struct {
-       StepID string
-       Key    string
-}
-
-// topologicalSort produces a list of topologically sorted PTransform ids and
-// a PCollection lookup structure for the supplied bundle. The function will
-// fail if the graph has cycles.
-func topologicalSort(bundle *fnapi_pb.ProcessBundleDescriptor) (sortedIds 
[]string, colls pCollMap, err error) {
-       colls = make(pCollMap)
-       for id, coll := range bundle.GetPcollections() {
-               colls[id] = &pCollInfo{pcoll: coll}
-       }
-
-       adjs := make(map[string]int)
-
-       for id, transform := range bundle.GetTransforms() {
-               // Populate the adjacency map
-               in := len(transform.GetInputs())
-               adjs[id] = in
-               if in == 0 {
-                       // Root node identified.
-                       sortedIds = append(sortedIds, id)
-               }
-       }
-
-       xforms := bundle.GetTransforms()
-       if len(xforms) == 0 {
-               return nil, nil, errRootlessBundle
-       }
-
-       frontier := append([]string(nil), sortedIds...)
-
-       for {
-               for _, id := range frontier {
-                       frontier = frontier[1:]
-                       xform := xforms[id]
-                       for _, out := range xform.GetOutputs() {
-                               // Look for consumer xforms that take this 
output as an input
-                               for cid, c := range xforms {
-                                       for _, in := range c.GetInputs() {
-                                               if in == out {
-                                                       // They are connected. 
Decrement the adjacency count of this xform
-                                                       adjs[cid] = adjs[cid] - 
1
-                                                       // Update the 
PCollection metadata to record the producing transform.
-                                                       colls[in].xid, 
colls[in].xform = id, xforms[id]
-
-                                                       if adjs[cid] == 0 {
-                                                               // Add it to 
the list
-                                                               frontier = 
append(frontier, cid)
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               // Add any completed nodes to the sorted list
-               sortedIds = append(sortedIds, frontier...)
-
-               // We're done when there are no more nodes to explore.
-               if len(frontier) == 0 {
-                       break
-               }
-       }
-
-       if len(sortedIds) != len(bundle.GetTransforms()) {
-               return nil, nil, errBundleHasCycle
-       }
-
-       return sortedIds, colls, nil
-}
-
-// translate translates a ProcessBundleDescriptor to a sub-graph that can run 
bundles.
-func translate(bundle *fnapi_pb.ProcessBundleDescriptor) (*graph.Graph, error) 
{
-       // NOTE: we will see only graph fragments w/o GBK, IMPULSE or FLATTEN, 
which
-       // are handled by the service.
-
-       // The incoming bundle is an unsorted collection of data. By applying a 
topological sort
-       // we can make a single linear pass to convert to the internal runner 
representation.
-       sortedIds, colls, err := topologicalSort(bundle)
-       if err != nil {
-               return nil, err
-       }
-
-       coders := graphx.NewCoderUnmarshaller(bundle.GetCoders())
-
-       g := graph.New()
-       nodes := make(map[nodeID]*graph.Node)
-       xforms := bundle.GetTransforms()
-
-       for _, id := range sortedIds {
-               transform := xforms[id]
-               spec := transform.GetSpec()
-               //log.Printf("SPEC: %v %v", id, transform.GetSpec())
-               switch spec.GetUrn() {
-               case "urn:org.apache.beam:source:java:0.1": // using Java's for 
now.
-                       var me v1.MultiEdge
-                       if err := 
protox.DecodeBase64(string(spec.GetPayload()), &me); err != nil {
-                               return nil, err
-                       }
-
-                       var fn *graph.Fn
-                       edge := g.NewEdge(g.Root())
-                       edge.Op, fn, edge.Input, edge.Output, err = 
graphx.DecodeMultiEdge(&me)
-                       if err != nil {
-                               return nil, err
-                       }
-                       edge.DoFn, err = graph.AsDoFn(fn)
-                       if err != nil {
-                               return nil, err
-                       }
-                       if err := link(g, nodes, coders, transform, id, edge, 
colls); err != nil {
-                               return nil, err
-                       }
-
-               case "urn:beam:dofn:javasdk:0.1": // We are using Java's for 
now.
-                       var me v1.MultiEdge
-                       if err := 
protox.DecodeBase64(string(spec.GetPayload()), &me); err != nil {
-                               return nil, err
-                       }
-
-                       var fn *graph.Fn
-                       edge := g.NewEdge(g.Root())
-                       edge.Op, fn, edge.Input, edge.Output, err = 
graphx.DecodeMultiEdge(&me)
-                       if err != nil {
-                               return nil, err
-                       }
-                       switch edge.Op {
-                       case graph.ParDo:
-                               edge.DoFn, err = graph.AsDoFn(fn)
-                               if err != nil {
-                                       return nil, err
-                               }
-                       case graph.Combine:
-                               edge.CombineFn, err = graph.AsCombineFn(fn)
-                               if err != nil {
-                                       return nil, err
-                               }
-                       default:
-                               panic(fmt.Sprintf("Opcode should be one of 
ParDo or Combine, but it is: %v", edge.Op))
-                       }
-                       if err := link(g, nodes, coders, transform, id, edge, 
colls); err != nil {
-                               return nil, err
-                       }
-
-               case "urn:org.apache.beam:source:runner:0.1":
-                       port, err := translatePort(spec.GetPayload())
-                       if err != nil {
-                               return nil, err
-                       }
-
-                       if size := len(transform.GetOutputs()); size != 1 {
-                               return nil, fmt.Errorf("expected 1 output, got 
%v", size)
-                       }
-                       var target *graph.Target
-                       for key := range transform.GetOutputs() {
-                               target = &graph.Target{ID: id, Name: key}
-                       }
-
-                       edge := g.NewEdge(g.Root())
-                       edge.Op = graph.DataSource
-                       edge.Port = port
-                       edge.Target = target
-                       edge.Output = []*graph.Outbound{{Type: nil}}
-
-                       if err := linkOutbound(g, nodes, coders, transform, id, 
edge, colls); err != nil {
-                               return nil, err
-                       }
-                       edge.Output[0].Type = edge.Output[0].To.Coder.T
-
-               case "urn:org.apache.beam:sink:runner:0.1":
-                       port, err := translatePort(spec.GetPayload())
-                       if err != nil {
-                               return nil, err
-                       }
-
-                       if size := len(transform.GetInputs()); size != 1 {
-                               return nil, fmt.Errorf("expected 1 input, got 
%v", size)
-                       }
-                       var target *graph.Target
-                       for key := range transform.GetInputs() {
-                               target = &graph.Target{ID: id, Name: key}
-                       }
-
-                       edge := g.NewEdge(g.Root())
-                       edge.Op = graph.DataSink
-                       edge.Port = port
-                       edge.Target = target
-                       edge.Input = []*graph.Inbound{{Type: nil}}
-
-                       if err := linkInbound(nodes, transform, edge, colls); 
err != nil {
-                               return nil, err
-                       }
-                       edge.Input[0].Type = edge.Input[0].From.Coder.T
-
-               default:
-                       return nil, fmt.Errorf("unexpected opcode: %v", spec)
-               }
-       }
-       return g, nil
-}
-
-func translatePort(data []byte) (*graph.Port, error) {
-       var port fnapi_pb.RemoteGrpcPort
-       if err := proto.Unmarshal(data, &port); err != nil {
-               return nil, err
-       }
-       return &graph.Port{
-               URL: port.GetApiServiceDescriptor().GetUrl(),
-       }, nil
-}
-
-func link(g *graph.Graph, nodes map[nodeID]*graph.Node, coders 
*graphx.CoderUnmarshaller, transform *rnapi_pb.PTransform, tid string, edge 
*graph.MultiEdge, colls pCollMap) error {
-       if err := linkInbound(nodes, transform, edge, colls); err != nil {
-               return err
-       }
-       return linkOutbound(g, nodes, coders, transform, tid, edge, colls)
-}
-
-func linkInbound(nodes map[nodeID]*graph.Node, transform *rnapi_pb.PTransform, 
edge *graph.MultiEdge, colls pCollMap) error {
-       from := translateInputs(transform, colls)
-       if len(from) != len(edge.Input) {
-               return fmt.Errorf("unexpected number of inputs: %v, want %v", 
len(from), len(edge.Input))
-       }
-       for i := 0; i < len(edge.Input); i++ {
-               edge.Input[i].From = nodes[from[i]]
-       }
-       return nil
-}
-
-func linkOutbound(g *graph.Graph, nodes map[nodeID]*graph.Node, coders 
*graphx.CoderUnmarshaller, transform *rnapi_pb.PTransform, tid string, edge 
*graph.MultiEdge, colls pCollMap) error {
-       to := translateOutputs(transform, tid, colls)
-       if len(to) != len(edge.Output) {
-               return fmt.Errorf("unexpected number of outputs: %v, want %v", 
len(to), len(edge.Output))
-       }
-
-       w := window.NewGlobalWindow()
-       if len(edge.Input) > 0 {
-               w = edge.Input[0].From.Window()
-       }
-       for i := 0; i < len(edge.Output); i++ {
-               c, err := coders.Coder(to[i].Coder)
-               if err != nil {
-                       return err
-               }
-
-               n := g.NewNode(c.T, w)
-               n.Coder = c
-               nodes[to[i].NodeID] = n
-
-               edge.Output[i].To = n
-       }
-       return nil
-}
-
-func translateInputs(transform *rnapi_pb.PTransform, colls pCollMap) []nodeID {
-       var from []nodeID
-
-       for _, in := range transform.GetInputs() {
-               // The runner API doesn't store the bidirectional relationship 
of nodes.
-               // We identify the data by working backwards to the 
PCollection, then
-               // consult our PCollection map to get info about the producing 
PTransform.
-               // Since each PTransform may produce many outputs, we look at 
all of them
-               // to find the output matching our input identifier.
-               fid := colls[in].xid
-               for okey, ocol := range colls[in].xform.GetOutputs() {
-                       if ocol == in {
-                               id := nodeID{fid, okey}
-                               from = append(from, id)
-                       }
-               }
-       }
-       return from
-}
-
-type output struct {
-       NodeID nodeID
-       Coder  string
-}
-
-func translateOutputs(transform *rnapi_pb.PTransform, tid string, colls 
pCollMap) []output {
-       var to []output
-
-       for key, col := range transform.GetOutputs() {
-               if key == "bogus" {
-                       continue // NOTE: remove bogus output
-               }
-
-               // TODO(herohde) 6/26/2017: we need to reorder output
-
-               coder := colls[col].pcoll.GetCoderId()
-               to = append(to, output{nodeID{tid, key}, coder})
-       }
-
-       return to
-}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/translate_test.go
deleted file mode 100644
index b1c98f9b083..00000000000
--- a/sdks/go/pkg/beam/core/runtime/harness/translate_test.go
+++ /dev/null
@@ -1,298 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package harness
-
-import (
-       "reflect"
-       "testing"
-
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
-       fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-       rnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-)
-
-// Declare the DoFn that is used in the test graph
-type emitLinesFn struct {
-       Lines []string `json:"lines"`
-}
-
-func (e *emitLinesFn) ProcessElement(emit func(string)) {
-}
-
-// Create a fake symbol lookup. For this test, it only needs
-// to return the address of a valid function. The function never
-// actually gets called. We make sure that remains true by supplying
-// a function that only panics.
-
-func panicer() {
-       panic("Nothing should call this.")
-}
-
-type symlookup bool
-
-func (s symlookup) Sym2Addr(name string) (uintptr, error) {
-       return reflect.ValueOf(panicer).Pointer(), nil
-}
-
-var fakeSymbols symlookup
-
-// retain the key for the registered function to encode into the test 
execution plan.
-var emitLinesFnKey string
-
-func init() {
-       emitLinesFnKey = 
runtime.RegisterType(reflect.TypeOf((*emitLinesFn)(nil)).Elem())
-       runtime.SymbolResolver = fakeSymbols
-}
-
-func windowedString() *v1.FullType {
-       return &v1.FullType{
-               Type: &v1.Type{
-                       Kind:    v1.Type_SPECIAL,
-                       Special: v1.Type_WINDOWEDVALUE,
-               },
-               Components: []*v1.FullType{
-                       &v1.FullType{
-                               Type: &v1.Type{
-                                       Kind: v1.Type_STRING,
-                               },
-                       },
-               },
-       }
-}
-
-func makeDoFn() []byte {
-       // This is a schematic for a DoFn that takes a windowed string
-       // and produces a windowed string.
-       me := v1.MultiEdge{
-               Opcode: "ParDo",
-               Inbound: []*v1.MultiEdge_Inbound{
-                       &v1.MultiEdge_Inbound{
-                               Kind: v1.MultiEdge_Inbound_MAIN,
-                               Type: windowedString(),
-                       },
-               },
-               Outbound: []*v1.MultiEdge_Outbound{
-                       &v1.MultiEdge_Outbound{
-                               Type: windowedString(),
-                       },
-               },
-               // schematic of a function that takes two arguments, an input 
string, and an output emitter of string.
-               Fn: &v1.Fn{
-                       Fn: &v1.UserFn{
-                               Name: "main.doFn",
-                               Type: &v1.Type{
-                                       Kind: v1.Type_FUNC,
-                                       ParameterTypes: []*v1.Type{
-                                               &v1.Type{Kind: v1.Type_STRING},
-                                               &v1.Type{
-                                                       Kind: v1.Type_FUNC,
-                                                       ParameterTypes: 
[]*v1.Type{
-                                                               &v1.Type{Kind: 
v1.Type_STRING},
-                                                       },
-                                               },
-                                       },
-                               },
-                       },
-               },
-       }
-
-       res, err := protox.EncodeBase64(&me)
-       if err != nil {
-               panic(err)
-       }
-       return []byte(res)
-}
-
-func makeSource() []byte {
-       me := v1.MultiEdge{
-               Outbound: []*v1.MultiEdge_Outbound{
-                       &v1.MultiEdge_Outbound{
-                               Type: windowedString(),
-                       },
-               },
-               Fn: &v1.Fn{
-                       Opt: "{\"lines\":[\"old pond\",\"a frog leaps 
in\",\"water's sound\"]}",
-                       Type: &v1.Type{
-                               Kind: v1.Type_PTR,
-                               Element: &v1.Type{
-                                       Kind:        v1.Type_EXTERNAL,
-                                       ExternalKey: emitLinesFnKey,
-                               },
-                               ExternalKey: emitLinesFnKey,
-                       },
-               },
-       }
-
-       res, err := protox.EncodeBase64(&me)
-       if err != nil {
-               panic(err)
-       }
-       return []byte(res)
-}
-
-func createReferenceGraph() *fnapi_pb.ProcessBundleDescriptor {
-       return &fnapi_pb.ProcessBundleDescriptor{
-               Id: "-7",
-               Transforms: map[string]*rnapi_pb.PTransform{
-                       "-14": &rnapi_pb.PTransform{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn:     
"urn:org.apache.beam:source:java:0.1",
-                                       Payload: makeSource(),
-                               },
-                               Outputs: map[string]string{"-6": "-9"},
-                       },
-                       "-17": &rnapi_pb.PTransform{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn:     "urn:beam:dofn:javasdk:0.1",
-                                       Payload: makeDoFn(),
-                               },
-                               Inputs:  map[string]string{"-16": "-11"},
-                               Outputs: map[string]string{"out": "-13"},
-                       },
-                       "-20": &rnapi_pb.PTransform{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn:     "urn:beam:dofn:javasdk:0.1",
-                                       Payload: makeDoFn(),
-                               },
-                               Inputs:  map[string]string{"-19": "-9"},
-                               Outputs: map[string]string{"out": "-11"},
-                       },
-                       "-4": &rnapi_pb.PTransform{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn:     
"urn:org.apache.beam:sink:runner:0.1",
-                                       Payload: 
[]byte("\n\025\n\002-1\022\017localhost:36335"),
-                               },
-                               Inputs: map[string]string{"-3": "-13"},
-                       },
-               },
-               Pcollections: map[string]*rnapi_pb.PCollection{
-                       "-11": &rnapi_pb.PCollection{CoderId: "-10"},
-                       "-13": &rnapi_pb.PCollection{CoderId: "-10"},
-                       "-9":  &rnapi_pb.PCollection{CoderId: "-10"},
-               },
-               Coders: getCoders(),
-       }
-}
-
-func TestGraphTranslationSuccess(t *testing.T) {
-       _, err := translate(createReferenceGraph())
-       if err != nil {
-               t.Errorf("translation failed: %v", err)
-       }
-}
-
-func TestGraphCycleDetection(t *testing.T) {
-       g := createReferenceGraph()
-       // Introduce a cycle in the graph
-       g.Transforms["-14"].Inputs = map[string]string{"-50": "-52"}
-       g.Transforms["-4"].Outputs = map[string]string{"-51": "-52"}
-       _, err := translate(g)
-       if err == nil {
-               t.Errorf("got nil error, expected cycle error: %v", g)
-       }
-}
-
-func TestTooManyOutputsSource(t *testing.T) {
-       g := createReferenceGraph()
-       g.Transforms["-14"].Outputs["-51"] = "-11"
-       _, err := translate(g)
-       if err == nil {
-               t.Errorf("got nil error, expected bad source error: %v", g)
-       }
-}
-
-func getCoders() map[string]*rnapi_pb.Coder {
-       return map[string]*rnapi_pb.Coder{
-               "-12": &rnapi_pb.Coder{
-                       Spec: &rnapi_pb.SdkFunctionSpec{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       // TODO(wcn): this blob will go away 
once coders are implemented as components rather than monoliths.
-                                       Payload: 
[]byte("{\"@type\":\"kind:windowed_value\",\"component_encodings\":[{\"@type\":\"kind:pair\",\"component_encodings\":[{\"component_encodings\":[{\"@type\":\"CgRqc29uEgIIAhpMCjJnaXRodWIuY29tL2dvb2dsZS9nby1iZWFtLXNkay1kZXYvcGtnL2JlYW0uSlNPTkVuYxIWCBYiBAgZQA8qBggUEgIICCoECBlAASJSCjJnaXRodWIuY29tL2dvb2dsZS9nby1iZWFtLXNkay1kZXYvcGtnL2JlYW0uSlNPTkRlYxIcCBYiBAgZQAMiBggUEgIICCoECBlADyoECBlAAQ==\"}],\"@type\":\"kind:length_prefix\"},{\"@type\":\"kind:length_prefix\",\"component_encodings\":[{\"@type\":\"CgRqc29uEgIIAhpMCjJnaXRodWIuY29tL2dvb2dsZS9nby1iZWFtLXNkay1kZXYvcGtnL2JlYW0uSlNPTkVuYxIWCBYiBAgZQA8qBggUEgIICCoECBlAASJSCjJnaXRodWIuY29tL2dvb2dsZS9nby1iZWFtLXNkay1kZXYvcGtnL2JlYW0uSlNPTkRlYxIcCBYiBAgZQAMiBggUEgIICCoECBlADyoECBlAAQ==\"}]}]},{\"@type\":\"kind:global_window\"}]}"),
-                               },
-                       },
-               },
-               "ByteArrayCoder": &rnapi_pb.Coder{
-                       Spec: &rnapi_pb.SdkFunctionSpec{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn: "urn:beam:coders:bytes:0.1",
-                               },
-                       },
-               },
-               "Coder": &rnapi_pb.Coder{
-                       Spec: &rnapi_pb.SdkFunctionSpec{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn: 
"urn:beam:coders:global_window:0.1",
-                               },
-                       },
-               },
-               "-10": &rnapi_pb.Coder{
-                       Spec: &rnapi_pb.SdkFunctionSpec{
-                               Spec: &rnapi_pb.FunctionSpec{
-                                       Urn: 
"urn:beam:coders:windowed_value:0.1",
-                               },
-                       },
-                       ComponentCoderIds: []string{
-                               "ByteArrayCoder",
-                               "Coder",
-                       },
-               },
-       }
-}
-
-func TestTranslateCoders(t *testing.T) {
-       input := getCoders()
-       coders := graphx.NewCoderUnmarshaller(input)
-
-       for id := range input {
-               if id == "Coder" {
-                       if _, err := coders.Window(id); err != nil {
-                               t.Errorf("window translation failed: %v", err)
-                       }
-                       continue
-               }
-
-               if _, err := coders.Coder(id); err != nil {
-                       t.Errorf("coder translation failed: %v", err)
-               }
-       }
-}
-
-func TestUnknownOpcode(t *testing.T) {
-       // Check unexpected opcode
-}
-
-func TestTooManyInputsSink(t *testing.T) {
-       g := createReferenceGraph()
-       g.Transforms["-4"].Inputs["-51"] = "-11"
-       _, err := translate(g)
-       if err == nil {
-               t.Errorf("got nil error, expected bad sink error: %v", g)
-       }
-}
-
-func TestBundleHasNoRoots(t *testing.T) {
-       g := createReferenceGraph()
-       // Remove all the transforms
-       g.Transforms = make(map[string]*rnapi_pb.PTransform)
-
-       _, err := translate(g)
-       if err != errRootlessBundle {
-               t.Errorf("got %v, wanted %v in graph: %v", err, 
errRootlessBundle, g)
-       }
-}
diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go 
b/sdks/go/pkg/beam/core/util/protox/base64.go
index 06db2cd7756..90fb13f51a7 100644
--- a/sdks/go/pkg/beam/core/util/protox/base64.go
+++ b/sdks/go/pkg/beam/core/util/protox/base64.go
@@ -22,6 +22,15 @@ import (
        "github.com/golang/protobuf/proto"
 )
 
+// MustEncodeBase64 encodes a proto wrapped in base64 and panics on failure.
+func MustEncodeBase64(msg proto.Message) string {
+       ret, err := EncodeBase64(msg)
+       if err != nil {
+               panic(err)
+       }
+       return ret
+}
+
 // EncodeBase64 encodes a proto wrapped in base64.
 func EncodeBase64(msg proto.Message) (string, error) {
        data, err := proto.Marshal(msg)
diff --git a/sdks/go/pkg/beam/core/util/protox/protox.go 
b/sdks/go/pkg/beam/core/util/protox/protox.go
new file mode 100644
index 00000000000..3555886eefc
--- /dev/null
+++ b/sdks/go/pkg/beam/core/util/protox/protox.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package protox contains utilities for working with protobufs.
+package protox
+
+import "github.com/golang/protobuf/proto"
+
+// MustEncode encode the message and panics on failure.
+func MustEncode(msg proto.Message) []byte {
+       data, err := proto.Marshal(msg)
+       if err != nil {
+               panic(err)
+       }
+       return data
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 367bdc6e444..c750a408011 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -16,6 +16,7 @@
 package dataflow
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -32,10 +33,13 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam"
        // Importing to get the side effect of the remote execution hook. See 
init().
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
        "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+       "github.com/golang/protobuf/proto"
        "golang.org/x/oauth2/google"
        df "google.golang.org/api/dataflow/v1b3"
        "google.golang.org/api/storage/v1"
@@ -65,6 +69,11 @@ func init() {
        beam.RegisterRunner("dataflow", Execute)
 }
 
+type dataflowOptions struct {
+       Options     map[string]string `json:"options"`
+       PipelineURL string            `json:"pipelineUrl"`
+}
+
 // Execute runs the given pipeline on Google Cloud Dataflow. It uses the
 // default application credentials to submit the job.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
@@ -100,7 +109,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        options := beam.PipelineOptions.Export()
 
-       // (1) Upload Go binary to GCS.
+       // (1) Upload Go binary and model to GCS.
 
        worker, err := buildLocalBinary(ctx)
        if err != nil {
@@ -111,6 +120,16 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return err
        }
 
+       model, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: 
*image})
+       if err != nil {
+               return fmt.Errorf("failed to generate model pipeline: %v", err)
+       }
+       modelURL, err := stageModel(ctx, project, *stagingLocation, 
protox.MustEncode(model))
+       if err != nil {
+               return err
+       }
+       log.Info(ctx, proto.MarshalTextString(model))
+
        // (2) Translate pipeline to v1b3 speak.
 
        steps, err := translate(edges)
@@ -133,7 +152,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                        }),
                        SdkPipelineOptions: newMsg(pipelineOptions{
                                DisplayData: findPipelineFlags(),
-                               Options:     options,
+                               Options: dataflowOptions{
+                                       Options:     options.Options,
+                                       PipelineURL: modelURL,
+                               },
                        }),
                        WorkerPools: []*df.WorkerPool{{
                                Kind: "harness",
@@ -165,7 +187,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return nil
        }
 
-       // (3) Submit job.
+       // (4) Submit job.
 
        client, err := newClient(ctx, *endpoint)
        if err != nil {
@@ -213,6 +235,26 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
 }
 
+// stageModel uploads the pipeline model to GCS as a unique object.
+func stageModel(ctx context.Context, project, location string, model []byte) 
(string, error) {
+       bucket, prefix, err := gcsx.ParseObject(location)
+       if err != nil {
+               return "", fmt.Errorf("invalid staging location %v: %v", 
location, err)
+       }
+       obj := path.Join(prefix, fmt.Sprintf("pipeline-%v", 
time.Now().UnixNano()))
+       if *dryRun {
+               full := fmt.Sprintf("gs://%v/%v", bucket, obj)
+               log.Infof(ctx, "Dry-run: not uploading model %v", full)
+               return full, nil
+       }
+
+       client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+       if err != nil {
+               return "", err
+       }
+       return gcsx.Upload(client, project, bucket, obj, bytes.NewReader(model))
+}
+
 // stageWorker uploads the worker binary to GCS as a unique object.
 func stageWorker(ctx context.Context, project, location, worker string) 
(string, error) {
        bucket, prefix, err := gcsx.ParseObject(location)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Go SDK support for portable pipelines
> -------------------------------------
>
>                 Key: BEAM-3287
>                 URL: https://issues.apache.org/jira/browse/BEAM-3287
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>              Labels: portability
>
> The Go SDK should participate in the portability framework, incl. job 
> submission w/ a docker container image.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to