This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e58d4e1  [BEAM-8017] Plumb errors and remove panics from package 
graphx (#13028)
e58d4e1 is described below

commit e58d4e1369874a488094d4473fd090d77ec7a9ae
Author: Jing <[email protected]>
AuthorDate: Thu Oct 8 09:01:49 2020 -0700

    [BEAM-8017] Plumb errors and remove panics from package graphx (#13028)
    
    [BEAM-8017] Plumb errors and remove panics from package graphx
    Exported methods and functions in package graphx are not backwards 
compatible, APIs return an extra error indicating roots of failures
---
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  99 ++++---
 sdks/go/pkg/beam/core/runtime/graphx/coder_test.go |   6 +-
 sdks/go/pkg/beam/core/runtime/graphx/cogbk.go      |  31 +-
 sdks/go/pkg/beam/core/runtime/graphx/dataflow.go   |   6 +-
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  |  47 +--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  | 327 +++++++++++++++------
 sdks/go/pkg/beam/core/runtime/graphx/xlang.go      |  12 +-
 sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go |  25 +-
 sdks/go/pkg/beam/core/runtime/xlangx/resolve.go    |   6 +-
 sdks/go/pkg/beam/core/runtime/xlangx/translate.go  |  36 ++-
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      |   7 +-
 sdks/go/pkg/beam/runners/universal/universal.go    |   7 +-
 12 files changed, 431 insertions(+), 178 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index c8be1d9..2c4bd43 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -76,10 +76,13 @@ func knownStandardCoders() []string {
 }
 
 // MarshalCoders marshals a list of coders into model coders.
-func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder) 
{
+func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder, 
error) {
        b := NewCoderMarshaller()
-       ids := b.AddMulti(coders)
-       return ids, b.Build()
+       if ids, err := b.AddMulti(coders); err != nil {
+               return nil, nil, err
+       } else {
+               return ids, b.Build(), nil
+       }
 }
 
 // UnmarshalCoders unmarshals coders.
@@ -160,19 +163,23 @@ func (b *CoderUnmarshaller) WindowCoder(id string) 
(*coder.WindowCoder, error) {
                return nil, err
        }
 
-       w := urnToWindowCoder(c.GetSpec().GetUrn())
+       w, err := urnToWindowCoder(c.GetSpec().GetUrn())
+       if err != nil {
+               return nil, errors.SetTopLevelMsgf(err, "failed to unmarshal 
window coder %v", id)
+       }
        b.windowCoders[id] = w
        return w, nil
 }
 
-func urnToWindowCoder(urn string) *coder.WindowCoder {
+func urnToWindowCoder(urn string) (*coder.WindowCoder, error) {
        switch urn {
        case urnGlobalWindow:
-               return coder.NewGlobalWindow()
+               return coder.NewGlobalWindow(), nil
        case urnIntervalWindow:
-               return coder.NewIntervalWindow()
+               return coder.NewIntervalWindow(), nil
        default:
-               panic(fmt.Sprintf("Failed to translate URN to window coder, 
unexpected URN: %v", urn))
+               err := errors.Errorf("unexpected URN %v for window coder", urn)
+               return nil, errors.WithContext(err, "translate URN to window 
coder")
        }
 }
 
@@ -422,19 +429,19 @@ func NewCoderMarshaller() *CoderMarshaller {
 }
 
 // Add adds the given coder to the set and returns its id. Idempotent.
-func (b *CoderMarshaller) Add(c *coder.Coder) string {
+func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
        switch c.Kind {
        case coder.Custom:
                ref, err := encodeCustomCoder(c.Custom)
                if err != nil {
                        typeName := c.Custom.Name
-                       panic(errors.SetTopLevelMsgf(err, "Failed to encode 
custom coder for type %s. "+
+                       return "", errors.SetTopLevelMsgf(err, "failed to 
encode custom coder for type %s. "+
                                "Make sure the type was registered before 
calling beam.Init. For example: "+
-                               
"beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName))
+                               
"beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName)
                }
                data, err := protox.EncodeBase64(ref)
                if err != nil {
-                       panic(errors.Wrapf(err, "Failed to marshal custom coder 
%v", c))
+                       return "", errors.Wrapf(err, "failed to marshal custom 
coder %v", c)
                }
                inner := b.internCoder(&pipepb.Coder{
                        Spec: &pipepb.FunctionSpec{
@@ -442,15 +449,20 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
                                Payload: []byte(data),
                        },
                })
-               return b.internBuiltInCoder(urnLengthPrefixCoder, inner)
+               return b.internBuiltInCoder(urnLengthPrefixCoder, inner), nil
 
        case coder.KV:
-               comp := b.AddMulti(c.Components)
-               return b.internBuiltInCoder(urnKVCoder, comp...)
+               comp, err := b.AddMulti(c.Components)
+               if err != nil {
+                       return "", errors.Wrapf(err, "failed to marshal KV 
coder %v", c)
+               }
+               return b.internBuiltInCoder(urnKVCoder, comp...), nil
 
        case coder.CoGBK:
-               comp := b.AddMulti(c.Components)
-
+               comp, err := b.AddMulti(c.Components)
+               if err != nil {
+                       return "", errors.Wrapf(err, "failed to marshal CoGBK 
coder %v", c)
+               }
                value := comp[1]
                if len(comp) > 2 {
                        // TODO(BEAM-490): don't inject union coder for CoGBK.
@@ -461,62 +473,77 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 
                // SDKs always provide iterableCoder to runners, but can 
receive StateBackedIterables in return.
                stream := b.internBuiltInCoder(urnIterableCoder, value)
-               return b.internBuiltInCoder(urnKVCoder, comp[0], stream)
+               return b.internBuiltInCoder(urnKVCoder, comp[0], stream), nil
 
        case coder.WindowedValue:
-               comp := b.AddMulti(c.Components)
-               comp = append(comp, b.AddWindowCoder(c.Window))
-               return b.internBuiltInCoder(urnWindowedValueCoder, comp...)
+               comp := []string{}
+               if ids, err := b.AddMulti(c.Components); err != nil {
+                       return "", errors.Wrapf(err, "failed to marshal window 
coder %v", c)
+               } else {
+                       comp = append(comp, ids...)
+               }
+               if id, err := b.AddWindowCoder(c.Window); err != nil {
+                       return "", errors.Wrapf(err, "failed to marshal window 
coder %v", c)
+               } else {
+                       comp = append(comp, id)
+               }
+               return b.internBuiltInCoder(urnWindowedValueCoder, comp...), nil
 
        case coder.Bytes:
                // TODO(herohde) 6/27/2017: add length-prefix and not assume 
nested by context?
-               return b.internBuiltInCoder(urnBytesCoder)
+               return b.internBuiltInCoder(urnBytesCoder), nil
 
        case coder.Bool:
-               return b.internBuiltInCoder(urnBoolCoder)
+               return b.internBuiltInCoder(urnBoolCoder), nil
 
        case coder.VarInt:
-               return b.internBuiltInCoder(urnVarIntCoder)
+               return b.internBuiltInCoder(urnVarIntCoder), nil
 
        case coder.Double:
-               return b.internBuiltInCoder(urnDoubleCoder)
+               return b.internBuiltInCoder(urnDoubleCoder), nil
 
        case coder.String:
-               return b.internBuiltInCoder(urnStringCoder)
+               return b.internBuiltInCoder(urnStringCoder), nil
 
        case coder.Row:
                rt := c.T.Type()
                s, err := schema.FromType(rt)
                if err != nil {
-                       panic(errors.SetTopLevelMsgf(err, "Failed to convert 
type %v to a schema.", rt))
+                       return "", errors.SetTopLevelMsgf(err, "failed to 
convert type %v to a schema.", rt)
                }
-               return b.internRowCoder(s)
+               return b.internRowCoder(s), nil
 
        // TODO(BEAM-10660): Handle coder.Timer support.
 
        default:
-               panic(fmt.Sprintf("Failed to marshal coder %v, unexpected coder 
kind: %v", c, c.Kind))
+               err := errors.Errorf("unexpected coder kind: %v", c.Kind)
+               return "", errors.WithContextf(err, "failed to marshal coder 
%v", c)
        }
 }
 
 // AddMulti adds the given coders to the set and returns their ids. Idempotent.
-func (b *CoderMarshaller) AddMulti(list []*coder.Coder) []string {
+func (b *CoderMarshaller) AddMulti(list []*coder.Coder) ([]string, error) {
        var ids []string
        for _, c := range list {
-               ids = append(ids, b.Add(c))
+               if id, err := b.Add(c); err != nil {
+                       return nil, errors.Wrapf(err, "failed to marshal the 
coder %v.", c)
+               } else {
+                       ids = append(ids, id)
+               }
        }
-       return ids
+       return ids, nil
 }
 
 // AddWindowCoder adds a window coder.
-func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string {
+func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) (string, error) 
{
        switch w.Kind {
        case coder.GlobalWindow:
-               return b.internBuiltInCoder(urnGlobalWindow)
+               return b.internBuiltInCoder(urnGlobalWindow), nil
        case coder.IntervalWindow:
-               return b.internBuiltInCoder(urnIntervalWindow)
+               return b.internBuiltInCoder(urnIntervalWindow), nil
        default:
-               panic(fmt.Sprintf("Failed to add window coder %v, unexpected 
window kind: %v", w, w.Kind))
+               err := errors.Errorf("window coder with unexpected type %v", 
w.Kind)
+               return "", errors.WithContextf(err, "failed to unmarshal window 
coder %v", w)
        }
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index cb9dcff..c781e30 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -107,7 +107,11 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
 
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
-                       coders, err := 
graphx.UnmarshalCoders(graphx.MarshalCoders([]*coder.Coder{test.c}))
+                       ids, marshalCoders, err := 
graphx.MarshalCoders([]*coder.Coder{test.c})
+                       if err != nil {
+                               t.Fatalf("Marshal(%v) failed: %v", test.c, err)
+                       }
+                       coders, err := graphx.UnmarshalCoders(ids, 
marshalCoders)
                        if err != nil {
                                t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", 
test.c, err)
                        }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go 
b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
index 0ec3ba8..ed5587e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
@@ -16,13 +16,12 @@
 package graphx
 
 import (
-       "fmt"
-
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CoGBK support
@@ -70,37 +69,47 @@ const (
 )
 
 // MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
        if gbk.Op != graph.CoGBK {
-               panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+               err := errors.Errorf("expected CoGBK, got %v", gbk)
+               return nil, errors.WithContext(err, "failed to make KV Union 
coder")
        }
 
        from := gbk.Input[0].From
        key := from.Coder.Components[0]
-       return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
+       kvCoder, err := makeUnionCoder()
+       if err != nil {
+               return nil, errors.Wrapf(err, "failed to make KV Union coder.")
+       }
+       return coder.NewKV([]*coder.Coder{key, kvCoder}), nil
 }
 
 // MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
-func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
        if gbk.Op != graph.CoGBK {
-               panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+               err := errors.Errorf("expected CoGBK, got %v", gbk)
+               return nil, errors.WithContext(err, "failed to make GBK Union 
coder")
        }
 
        from := gbk.Input[0].From
        key := from.Coder.Components[0]
-       return coder.NewCoGBK([]*coder.Coder{key, makeUnionCoder()})
+       kvCoder, err := makeUnionCoder()
+       if err != nil {
+               return nil, errors.Wrapf(err, "failed to make GBK Union coder.")
+       }
+       return coder.NewCoGBK([]*coder.Coder{key, kvCoder}), nil
 }
 
 // makeUnionCoder returns a coder for the raw union value, KV<int,[]byte>. It 
uses
 // varintz instead of the built-in varint to avoid the implicit 
length-prefixing
 // of varint otherwise introduced by Dataflow.
-func makeUnionCoder() *coder.Coder {
+func makeUnionCoder() (*coder.Coder, error) {
        c, err := coderx.NewVarIntZ(reflectx.Int)
        if err != nil {
-               panic(err)
+               return nil, err
        }
        return coder.NewKV([]*coder.Coder{
                {Kind: coder.Custom, T: typex.New(reflectx.Int), Custom: c},
                coder.NewBytes(),
-       })
+       }), nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go 
b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
index 30079e8..3f68e63 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -64,12 +64,12 @@ func WrapIterable(c *CoderRef) *CoderRef {
 }
 
 // WrapWindowed adds a windowed coder for Dataflow collections.
-func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) *CoderRef {
+func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) (*CoderRef, error) {
        w, err := encodeWindowCoder(wc)
        if err != nil {
-               panic(err)
+               return nil, err
        }
-       return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, 
w}, IsWrapper: true}
+       return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, 
w}, IsWrapper: true}, nil
 }
 
 // EncodeCoderRefs returns the encoded forms understood by the runner.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 251f222..9a9094f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -16,7 +16,6 @@
 package graphx
 
 import (
-       "fmt"
        "reflect"
        "strings"
 
@@ -63,7 +62,11 @@ func EncodeMultiEdge(edge *graph.MultiEdge) 
(*v1pb.MultiEdge, error) {
        }
 
        for _, in := range edge.Input {
-               kind := encodeInputKind(in.Kind)
+               kind, err := encodeInputKind(in.Kind)
+               if err != nil {
+                       wrapped := errors.Wrap(err, "bad input type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
userfn %v", edge)
+               }
                t, err := encodeFullType(in.Type)
                if err != nil {
                        wrapped := errors.Wrap(err, "bad input type")
@@ -241,7 +244,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
                typ, err := encodeType(t)
                if err != nil {
                        wrapped := errors.Wrapf(err, "failed to encode receiver 
type %T", u.Recv)
-                       panic(errors.WithContextf(wrapped, "encoding structural 
DoFn %v", u))
+                       return nil, errors.WithContextf(wrapped, "encoding 
structural DoFn %v", u)
                }
 
                data, err := jsonx.Marshal(u.Recv)
@@ -252,7 +255,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
                return &v1pb.Fn{Type: typ, Opt: string(data)}, nil
 
        default:
-               panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
+               return nil, errors.Errorf("failed to encode DoFn %v, missing 
fn", u)
        }
 }
 
@@ -479,7 +482,11 @@ func encodeType(t reflect.Type) (*v1pb.Type, error) {
                        wrapped := errors.Wrap(err, "bad element type")
                        return nil, errors.WithContextf(wrapped, "encoding 
channel %v", t)
                }
-               dir := encodeChanDir(t.ChanDir())
+               dir, err := encodeChanDir(t.ChanDir())
+               if err != nil {
+                       wrapped := errors.Wrap(err, "bad channel direction")
+                       return nil, errors.WithContextf(wrapped, "encoding 
channel %v", t)
+               }
                return &v1pb.Type{Kind: v1pb.Type_CHAN, Element: elm, ChanDir: 
dir}, nil
 
        case reflect.Ptr:
@@ -725,16 +732,17 @@ func decodeInts(offsets []int32) []int {
        return ret
 }
 
-func encodeChanDir(dir reflect.ChanDir) v1pb.Type_ChanDir {
+func encodeChanDir(dir reflect.ChanDir) (v1pb.Type_ChanDir, error) {
        switch dir {
        case reflect.RecvDir:
-               return v1pb.Type_RECV
+               return v1pb.Type_RECV, nil
        case reflect.SendDir:
-               return v1pb.Type_SEND
+               return v1pb.Type_SEND, nil
        case reflect.BothDir:
-               return v1pb.Type_BOTH
+               return v1pb.Type_BOTH, nil
        default:
-               panic(fmt.Sprintf("Failed to encode channel direction, invalid 
value: %v", dir))
+               err := errors.Errorf("invalid value: %v", dir)
+               return v1pb.Type_BOTH, errors.WithContextf(err, "encoding 
channel direction")
        }
 }
 
@@ -752,24 +760,25 @@ func decodeChanDir(dir v1pb.Type_ChanDir) 
(reflect.ChanDir, error) {
        }
 }
 
-func encodeInputKind(k graph.InputKind) v1pb.MultiEdge_Inbound_InputKind {
+func encodeInputKind(k graph.InputKind) (v1pb.MultiEdge_Inbound_InputKind, 
error) {
        switch k {
        case graph.Main:
-               return v1pb.MultiEdge_Inbound_MAIN
+               return v1pb.MultiEdge_Inbound_MAIN, nil
        case graph.Singleton:
-               return v1pb.MultiEdge_Inbound_SINGLETON
+               return v1pb.MultiEdge_Inbound_SINGLETON, nil
        case graph.Slice:
-               return v1pb.MultiEdge_Inbound_SLICE
+               return v1pb.MultiEdge_Inbound_SLICE, nil
        case graph.Map:
-               return v1pb.MultiEdge_Inbound_MAP
+               return v1pb.MultiEdge_Inbound_MAP, nil
        case graph.MultiMap:
-               return v1pb.MultiEdge_Inbound_MULTIMAP
+               return v1pb.MultiEdge_Inbound_MULTIMAP, nil
        case graph.Iter:
-               return v1pb.MultiEdge_Inbound_ITER
+               return v1pb.MultiEdge_Inbound_ITER, nil
        case graph.ReIter:
-               return v1pb.MultiEdge_Inbound_REITER
+               return v1pb.MultiEdge_Inbound_REITER, nil
        default:
-               panic(fmt.Sprintf("Failed to encode input kind, invalid value: 
%v", k))
+               err := errors.Errorf("invalid value: %v", k)
+               return v1pb.MultiEdge_Inbound_MAIN, errors.WithContextf(err, 
"encoding input kind")
        }
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d427e9f..70d39a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -77,12 +77,12 @@ func goCapabilities() []string {
 }
 
 // CreateEnvironment produces the appropriate payload for the type of 
environment.
-func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
+func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, 
error) {
        var serializedPayload []byte
        switch urn {
        case "beam:env:process:v1":
                // TODO Support process based SDK Harness.
-               panic(fmt.Sprintf("Unsupported environment %v", urn))
+               return nil, errors.Errorf("unsupported environment %v", urn)
        case "beam:env:external:v1":
                config := extractEnvironmentConfig(ctx)
                payload := &pipepb.ExternalPayload{Endpoint: 
&pipepb.ApiServiceDescriptor{Url: config}}
@@ -107,7 +107,7 @@ func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig
                                }),
                        },
                },
-       }
+       }, nil
 }
 
 // TODO(herohde) 11/6/2017: move some of the configuration into the graph 
during construction.
@@ -124,10 +124,16 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) 
(*pipepb.Pipeline, error) {
 
        m := newMarshaller(opt)
        for _, edge := range tree.Edges {
-               m.addMultiEdge(edge)
+               _, err := m.addMultiEdge(edge)
+               if err != nil {
+                       return nil, err
+               }
        }
        for _, t := range tree.Children {
-               m.addScopeTree(t)
+               _, err := m.addScopeTree(t)
+               if err != nil {
+                       return nil, err
+               }
        }
 
        p := &pipepb.Pipeline{
@@ -184,18 +190,26 @@ func (m *marshaller) getRequirements() []string {
        return reqs
 }
 
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
        id := scopeID(s.Scope.Scope)
        if _, exists := m.transforms[id]; exists {
-               return id
+               return id, nil
        }
 
        var subtransforms []string
        for _, edge := range s.Edges {
-               subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
+               ids, err := m.addMultiEdge(edge)
+               if err != nil {
+                       return "", errors.Wrapf(err, "failed to add scope tree: 
%v", s)
+               }
+               subtransforms = append(subtransforms, ids...)
        }
        for _, tree := range s.Children {
-               subtransforms = append(subtransforms, m.addScopeTree(tree))
+               id, err := m.addScopeTree(tree)
+               if err != nil {
+                       return "", errors.Wrapf(err, "failed to add scope tree: 
%v", s)
+               }
+               subtransforms = append(subtransforms, id)
        }
 
        transform := &pipepb.PTransform{
@@ -204,60 +218,89 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
                EnvironmentId: m.addDefaultEnv(),
        }
 
-       m.updateIfCombineComposite(s, transform)
+       if err := m.updateIfCombineComposite(s, transform); err != nil {
+               return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+       }
 
        m.transforms[id] = transform
-       return id
+       return id, nil
 }
 
 // updateIfCombineComposite examines the scope tree and sets the PTransform 
Spec
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for 
runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pipepb.PTransform) error {
        if s.Scope.Name != graph.CombinePerKeyScope ||
                len(s.Edges) != 2 ||
                len(s.Edges[0].Edge.Input) != 1 ||
                len(s.Edges[1].Edge.Output) != 1 ||
                s.Edges[1].Edge.Op != graph.Combine {
-               return
+               return nil
        }
 
        edge := s.Edges[1].Edge
-       acID := m.coders.Add(edge.AccumCoder)
+       acID, err := m.coders.Add(edge.AccumCoder)
+       if err != nil {
+               return errors.Wrapf(err, "failed to update PTransform spec: 
%v", transform)
+       }
+       mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge)
+       if err != nil {
+               return errors.Wrapf(err, "failed to update PTransform spec: 
%v", transform)
+       }
        payload := &pipepb.CombinePayload{
                CombineFn: &pipepb.FunctionSpec{
                        Urn:     URNDoFn,
-                       Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
+                       Payload: []byte(mustEncodeMultiEdge),
                },
                AccumulatorCoderId: acID,
        }
        transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
+       return nil
 }
 
-func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
+       handleErr := func(err error) ([]string, error) {
+               return nil, errors.Wrapf(err, "failed to add input kind: %v", 
edge)
+       }
        id := edgeID(edge.Edge)
        if _, exists := m.transforms[id]; exists {
-               return []string{id}
+               return []string{id}, nil
        }
 
        switch {
        case edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1:
-               return []string{m.expandCoGBK(edge)}
+               cogbkID, err := m.expandCoGBK(edge)
+               if err != nil {
+                       return handleErr(err)
+               }
+               return []string{cogbkID}, nil
        case edge.Edge.Op == graph.Reshuffle:
-               return []string{m.expandReshuffle(edge)}
+               reshuffleID, err := m.expandReshuffle(edge)
+               if err != nil {
+                       return handleErr(err)
+               }
+               return []string{reshuffleID}, nil
        case edge.Edge.Op == graph.External && edge.Edge.Payload == nil:
-               return []string{m.expandCrossLanguage(edge)}
+               edgeID, err := m.expandCrossLanguage(edge)
+               if err != nil {
+                       return handleErr(err)
+               }
+               return []string{edgeID}, nil
        }
 
        inputs := make(map[string]string)
        for i, in := range edge.Edge.Input {
-               m.addNode(in.From)
+               if _, err := m.addNode(in.From); err != nil {
+                       return handleErr(err)
+               }
                inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
        }
        outputs := make(map[string]string)
        for i, out := range edge.Edge.Output {
-               m.addNode(out.To)
+               if _, err := m.addNode(out.To); err != nil {
+                       return handleErr(err)
+               }
                outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
        }
 
@@ -281,7 +324,13 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string 
{
                                // "", even if the input is already KV.
 
                                out := fmt.Sprintf("%v_keyed%v_%v", 
nodeID(in.From), edgeID(edge.Edge), i)
-                               m.makeNode(out, 
m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
+                               coderId, err := 
m.coders.Add(makeBytesKeyedCoder(in.From.Coder))
+                               if err != nil {
+                                       return handleErr(err)
+                               }
+                               if _, err := m.makeNode(out, coderId, in.From); 
err != nil {
+                                       return handleErr(err)
+                               }
 
                                payload := &pipepb.ParDoPayload{
                                        DoFn: &pipepb.FunctionSpec{
@@ -322,31 +371,44 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
[]string {
                                }
 
                        case graph.Map, graph.MultiMap:
-                               panic("NYI")
+                               return nil, errors.Errorf("not implemented")
 
                        default:
-                               panic(fmt.Sprintf("unexpected input kind: %v", 
edge))
+                               return nil, errors.Errorf("unexpected input 
kind: %v", edge)
                        }
                }
 
+               mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+               if err != nil {
+                       return handleErr(err)
+               }
+
                payload := &pipepb.ParDoPayload{
                        DoFn: &pipepb.FunctionSpec{
                                Urn:     URNDoFn,
-                               Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+                               Payload: []byte(mustEncodeMultiEdge),
                        },
                        SideInputs: si,
                }
                if edge.Edge.DoFn.IsSplittable() {
-                       payload.RestrictionCoderId = 
m.coders.Add(edge.Edge.RestrictionCoder)
+                       coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
+                       if err != nil {
+                               return handleErr(err)
+                       }
+                       payload.RestrictionCoderId = coderId
                        m.requirements[URNRequiresSplittableDoFn] = true
                }
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
 
        case graph.Combine:
+               mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+               if err != nil {
+                       return handleErr(err)
+               }
                payload := &pipepb.ParDoPayload{
                        DoFn: &pipepb.FunctionSpec{
                                Urn:     URNDoFn,
-                               Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+                               Payload: []byte(mustEncodeMultiEdge),
                        },
                }
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
@@ -358,8 +420,12 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string 
{
                spec = &pipepb.FunctionSpec{Urn: URNGBK}
 
        case graph.WindowInto:
+               windowFn, err := makeWindowFn(edge.Edge.WindowFn)
+               if err != nil {
+                       return handleErr(err)
+               }
                payload := &pipepb.WindowIntoPayload{
-                       WindowFn: makeWindowFn(edge.Edge.WindowFn),
+                       WindowFn: windowFn,
                }
                spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: 
protox.MustEncode(payload)}
 
@@ -367,7 +433,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN, 
Payload: edge.Edge.Payload.Data}
 
        default:
-               panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
+               err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
+               return handleErr(err)
        }
 
        var transformEnvID = ""
@@ -384,17 +451,19 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
[]string {
        }
        m.transforms[id] = transform
        allPIds = append(allPIds, id)
-       return allPIds
+       return allPIds, nil
 }
 
-func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
+func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) (string, error) {
        edge := namedEdge.Edge
        id := edgeID(edge)
 
        inputs := make(map[string]string)
 
        for tag, n := range ExternalInputs(edge) {
-               m.addNode(n)
+               if _, err := m.addNode(n); err != nil {
+                       return "", errors.Wrapf(err, "failed to expand cross 
language transform for edge: %v", namedEdge)
+               }
                // Ignore tag if it is a dummy SourceInputTag
                if tag == graph.SourceInputTag {
                        tag = fmt.Sprintf("i%v", edge.External.InputsMap[tag])
@@ -420,33 +489,61 @@ func (m *marshaller) expandCrossLanguage(namedEdge 
NamedEdge) string {
                // map consumers of these outputs to the expanded transform's 
outputs.
                outputs := make(map[string]string)
                for i, out := range edge.Output {
-                       m.addNode(out.To)
+                       if _, err := m.addNode(out.To); err != nil {
+                               return "", errors.Wrapf(err, "failed to expand 
cross language transform for edge: %v", namedEdge)
+                       }
                        outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
                }
                transform.Outputs = outputs
-               transform.EnvironmentId = 
ExpandedTransform(edge.External.Expanded).EnvironmentId
+               environment, err := ExpandedTransform(edge.External.Expanded)
+               if err != nil {
+                       return "", errors.Wrapf(err, "failed to expand cross 
language transform for edge: %v", namedEdge)
+               }
+               transform.EnvironmentId = environment.EnvironmentId
        }
 
        m.transforms[id] = transform
-       return id
+       return id, nil
 }
 
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
        // TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have 
to translate
        // CoGBK with multiple PCollections as described in cogbk.go.
+       handleErr := func(err error) (string, error) {
+               return "", errors.Wrapf(err, "failed to expand CoGBK transform 
for edge: %v", edge)
+       }
 
        id := edgeID(edge.Edge)
-       kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
-       gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+       kvCoder, err := MakeKVUnionCoder(edge.Edge)
+       if err != nil {
+               return handleErr(err)
+       }
+       kvCoderID, err := m.coders.Add(kvCoder)
+       if err != nil {
+               return handleErr(err)
+       }
+       gbkCoder, err := MakeGBKUnionCoder(edge.Edge)
+       if err != nil {
+               return handleErr(err)
+       }
+       gbkCoderID, err := m.coders.Add(gbkCoder)
+       if err != nil {
+               return handleErr(err)
+       }
 
        var subtransforms []string
 
        inputs := make(map[string]string)
        for i, in := range edge.Edge.Input {
-               m.addNode(in.From)
+
+               if _, err := m.addNode(in.From); err != nil {
+                       return handleErr(err)
+               }
 
                out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
-               m.makeNode(out, kvCoderID, in.From)
+               if _, err := m.makeNode(out, kvCoderID, in.From); err != nil {
+                       return handleErr(err)
+               }
 
                // Inject(i)
 
@@ -481,7 +578,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        // Flatten
 
        out := fmt.Sprintf("%v_flatten", nodeID(outNode))
-       m.makeNode(out, kvCoderID, outNode)
+       if _, err := m.makeNode(out, kvCoderID, outNode); err != nil {
+               return handleErr(err)
+       }
 
        flattenID := fmt.Sprintf("%v_flatten", id)
        flatten := &pipepb.PTransform{
@@ -497,7 +596,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        // CoGBK
 
        gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
-       m.makeNode(gbkOut, gbkCoderID, outNode)
+       if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+               return handleErr(err)
+       }
 
        gbkID := fmt.Sprintf("%v_gbk", id)
        gbk := &pipepb.PTransform{
@@ -511,7 +612,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 
        // Expand
 
-       m.addNode(outNode)
+       if _, err := m.addNode(outNode); err != nil {
+               return handleErr(err)
+       }
 
        expandID := fmt.Sprintf("%v_expand", id)
        payload := &pipepb.ParDoPayload{
@@ -543,7 +646,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                Subtransforms: subtransforms,
                EnvironmentId: m.addDefaultEnv(),
        }
-       return cogbkID
+       return cogbkID, nil
 }
 
 // expandReshuffle translates resharding to a composite reshuffle
@@ -569,36 +672,62 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 // User code is able to write reshards, but it's easier to access
 // the window coders framework side, which is critical for the reshard
 // to function with unbounded inputs.
-func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
+       handleErr := func(err error) (string, error) {
+               return "", errors.Wrapf(err, "failed to expand Reshuffle 
transform for edge: %v", edge)
+       }
        id := edgeID(edge.Edge)
-       var kvCoderID, gbkCoderID string
-       {
-               kv := makeUnionCoder()
-               kvCoderID = m.coders.Add(kv)
-               gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+       kvCoder, err := makeUnionCoder()
+       if err != nil {
+               return handleErr(err)
+       }
+       kvCoderID, err := m.coders.Add(kvCoder)
+       if err != nil {
+               return handleErr(err)
+       }
+       gbkCoderID, err := m.coders.Add(coder.NewCoGBK(kvCoder.Components))
+       if err != nil {
+               return handleErr(err)
        }
 
        var subtransforms []string
 
        in := edge.Edge.Input[0]
 
-       origInput := m.addNode(in.From)
+       origInput, err := m.addNode(in.From)
+       if err != nil {
+               return handleErr(err)
+       }
        // We need to preserve the old windowing/triggering here
        // for re-instatement after the GBK.
        preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
 
        // Get the windowing strategy from before:
        postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
-       m.makeNode(postReify, kvCoderID, in.From)
+       if _, err := m.makeNode(postReify, kvCoderID, in.From); err != nil {
+               return handleErr(err)
+       }
 
        // We need to replace postReify's windowing strategy with one 
appropriate
        // for reshuffles.
        {
                wfn := window.NewGlobalWindows()
+               windowFn, err := makeWindowFn(wfn)
+               if err != nil {
+                       return handleErr(err)
+               }
+               coderId, err := makeWindowCoder(wfn)
+               if err != nil {
+                       return handleErr(err)
+               }
+               windowCoderId, err := m.coders.AddWindowCoder(coderId)
+               if err != nil {
+                       return handleErr(err)
+               }
                m.pcollections[postReify].WindowingStrategyId =
                        m.internWindowingStrategy(&pipepb.WindowingStrategy{
                                // Not segregated by time...
-                               WindowFn: makeWindowFn(wfn),
+                               WindowFn: windowFn,
                                // ...output after every element is received...
                                Trigger: &pipepb.Trigger{
                                        Trigger: &pipepb.Trigger_Always_{
@@ -614,7 +743,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string 
{
                                // TODO(BEAM-3304): migrate to user side 
operations once trigger support is in.
                                EnvironmentId:   m.addDefaultEnv(),
                                MergeStatus:     pipepb.MergeStatus_NON_MERGING,
-                               WindowCoderId:   
m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+                               WindowCoderId:   windowCoderId,
                                ClosingBehavior: 
pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
                                AllowedLateness: 0,
                                OnTimeBehavior:  
pipepb.OnTimeBehavior_FIRE_ALWAYS,
@@ -650,7 +779,9 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string 
{
        // GBK
 
        gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
-       m.makeNode(gbkOut, gbkCoderID, outNode)
+       if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+               return handleErr(err)
+       }
 
        gbkID := fmt.Sprintf("%v_gbk", id)
        gbk := &pipepb.PTransform{
@@ -664,7 +795,10 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
string {
 
        // Expand
 
-       outPCol := m.addNode(outNode)
+       outPCol, err := m.addNode(outNode)
+       if err != nil {
+               return handleErr(err)
+       }
        m.pcollections[outPCol].WindowingStrategyId = preservedWSId
 
        outputID := fmt.Sprintf("%v_unreify", id)
@@ -699,27 +833,37 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
string {
                },
                EnvironmentId: m.addDefaultEnv(),
        }
-       return reshuffleID
+       return reshuffleID, nil
 }
 
-func (m *marshaller) addNode(n *graph.Node) string {
+func (m *marshaller) addNode(n *graph.Node) (string, error) {
        id := nodeID(n)
        if _, exists := m.pcollections[id]; exists {
-               return id
+               return id, nil
        }
        // TODO(herohde) 11/15/2017: expose UniqueName to user.
-       return m.makeNode(id, m.coders.Add(n.Coder), n)
+       cid, err := m.coders.Add(n.Coder)
+       if err != nil {
+               return "", err
+       }
+       return m.makeNode(id, cid, n)
 }
 
-func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
+func (m *marshaller) makeNode(id, cid string, n *graph.Node) (string, error) {
+       windowingStrategyId, err := 
m.addWindowingStrategy(n.WindowingStrategy())
+
+       if err != nil {
+               return "", errors.Wrapf(err, "failed to make node %v with node 
id %v", n, id)
+       }
+
        col := &pipepb.PCollection{
                UniqueName:          id,
                CoderId:             cid,
                IsBounded:           boolToBounded(n.Bounded()),
-               WindowingStrategyId: 
m.addWindowingStrategy(n.WindowingStrategy()),
+               WindowingStrategyId: windowingStrategyId,
        }
        m.pcollections[id] = col
-       return id
+       return id, nil
 }
 
 func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
@@ -737,10 +881,13 @@ func (m *marshaller) addDefaultEnv() string {
        return id
 }
 
-func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
-       ws := marshalWindowingStrategy(m.coders, w)
+func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) 
(string, error) {
+       ws, err := marshalWindowingStrategy(m.coders, w)
+       if err != nil {
+               return "", errors.Wrapf(err, "failed to add window strategy 
%v", w)
+       }
        ws.EnvironmentId = m.addDefaultEnv()
-       return m.internWindowingStrategy(ws)
+       return m.internWindowingStrategy(ws), nil
 }
 
 func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) 
string {
@@ -757,12 +904,24 @@ func (m *marshaller) internWindowingStrategy(w 
*pipepb.WindowingStrategy) string
 
 // marshalWindowingStrategy marshals the given windowing strategy in
 // the given coder context.
-func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
*pipepb.WindowingStrategy {
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
(*pipepb.WindowingStrategy, error) {
+       windowFn, err := makeWindowFn(w.Fn)
+       if err != nil {
+               return nil, err
+       }
+       coderId, err := makeWindowCoder(w.Fn)
+       if err != nil {
+               return nil, err
+       }
+       windowCoderId, err := c.AddWindowCoder(coderId)
+       if err != nil {
+               return nil, err
+       }
        ws := &pipepb.WindowingStrategy{
-               WindowFn:         makeWindowFn(w.Fn),
+               WindowFn:         windowFn,
                MergeStatus:      pipepb.MergeStatus_NON_MERGING,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
-               WindowCoderId:    c.AddWindowCoder(makeWindowCoder(w.Fn)),
+               WindowCoderId:    windowCoderId,
                Trigger: &pipepb.Trigger{
                        Trigger: &pipepb.Trigger_Default_{
                                Default: &pipepb.Trigger_Default{},
@@ -773,15 +932,15 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) *
                AllowedLateness: 0,
                OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
        }
-       return ws
+       return ws, nil
 }
 
-func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
+func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
        switch w.Kind {
        case window.GlobalWindows:
                return &pipepb.FunctionSpec{
                        Urn: URNGlobalWindowsWindowFn,
-               }
+               }, nil
        case window.FixedWindows:
                return &pipepb.FunctionSpec{
                        Urn: URNFixedWindowsWindowFn,
@@ -790,7 +949,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
                                        Size: ptypes.DurationProto(w.Size),
                                },
                        ),
-               }
+               }, nil
        case window.SlidingWindows:
                return &pipepb.FunctionSpec{
                        Urn: URNSlidingWindowsWindowFn,
@@ -800,7 +959,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
                                        Period: ptypes.DurationProto(w.Period),
                                },
                        ),
-               }
+               }, nil
        case window.Sessions:
                return &pipepb.FunctionSpec{
                        Urn: URNSessionsWindowFn,
@@ -809,32 +968,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
                                        GapSize: ptypes.DurationProto(w.Gap),
                                },
                        ),
-               }
+               }, nil
        default:
-               panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+               return nil, errors.Errorf("unexpected windowing strategy: %v", 
w)
        }
 }
 
-func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
+func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) {
        switch w.Kind {
        case window.GlobalWindows:
-               return coder.NewGlobalWindow()
+               return coder.NewGlobalWindow(), nil
        case window.FixedWindows, window.SlidingWindows, 
URNSlidingWindowsWindowFn:
-               return coder.NewIntervalWindow()
+               return coder.NewIntervalWindow(), nil
        default:
-               panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+               return nil, errors.Errorf("unexpected windowing strategy: %v", 
w)
        }
 }
 
-func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
+func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) (string, error) {
        ref, err := EncodeMultiEdge(edge)
        if err != nil {
-               panic(errors.Wrapf(err, "Failed to serialize %v", edge))
+               return "", errors.Wrapf(err, "failed to serialize %v", edge)
        }
        return protox.MustEncodeBase64(&v1pb.TransformPayload{
                Urn:  URNDoFn,
                Edge: ref,
-       })
+       }), nil
 }
 
 // makeBytesKeyedCoder returns KV<[]byte,A,> for any coder,
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go 
b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
index cc1bb71..3311e46 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
@@ -23,20 +23,20 @@ import (
 
 // ExpandedComponents type asserts the Components field with interface{} type
 // and returns its pipeline component proto representation
-func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {
+func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, 
error) {
        if c, ok := exp.Components.(*pipepb.Components); ok {
-               return c
+               return c, nil
        }
-       panic(errors.Errorf("malformed components; %v lacks a conforming 
pipeline component", exp))
+       return nil, errors.Errorf("malformed components; %v lacks a conforming 
pipeline component", exp)
 }
 
 // ExpandedTransform type asserts the Transform field with interface{} type
 // and returns its pipeline ptransform proto representation
-func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform {
+func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, 
error) {
        if t, ok := exp.Transform.(*pipepb.PTransform); ok {
-               return t
+               return t, nil
        }
-       panic(errors.Errorf("malformed transform; %v lacks a conforming 
pipeline ptransform", exp))
+       return nil, errors.Errorf("malformed transform; %v lacks a conforming 
pipeline ptransform", exp)
 }
 
 // ExternalInputs returns the map (tag -> graph node representing the
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
index 98e415b..a71916e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
@@ -156,7 +156,10 @@ func TestExpandedTransform(t *testing.T) {
                want := newTransform("x")
                exp := &graph.ExpandedTransform{Transform: want}
 
-               got := ExpandedTransform(exp)
+               got, err := ExpandedTransform(exp)
+               if err != nil {
+                       t.Fatal(err)
+               }
 
                if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
                        t.Errorf("diff (-want, +got): %v", d)
@@ -165,9 +168,12 @@ func TestExpandedTransform(t *testing.T) {
        })
 
        t.Run("Malformed PTransform", func(t *testing.T) {
-               defer expectPanic(t, "string can't be type asserted into a 
pipeline PTransform")
                exp := &graph.ExpandedTransform{Transform: "gibberish"}
-               ExpandedTransform(exp)
+               expectedError := fmt.Sprintf("malformed transform; %v lacks a 
conforming pipeline ptransform", exp)
+               if _, actualError := ExpandedTransform(exp); 
actualError.Error() != expectedError {
+                       t.Errorf("got error %v, want error %v", actualError, 
expectedError)
+               }
+
        })
 }
 
@@ -176,7 +182,11 @@ func TestExpandedComponents(t *testing.T) {
                want := newComponents([]string{"x"})
                exp := &graph.ExpandedTransform{Components: want}
 
-               got := ExpandedComponents(exp)
+               got, err := ExpandedComponents(exp)
+
+               if err != nil {
+                       t.Fatal(err)
+               }
 
                if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
                        t.Errorf("diff (-want, +got): %v", d)
@@ -185,8 +195,11 @@ func TestExpandedComponents(t *testing.T) {
        })
 
        t.Run("Malformed Components", func(t *testing.T) {
-               defer expectPanic(t, "string can't be type asserted into a 
pipeline Components")
                exp := &graph.ExpandedTransform{Transform: "gibberish"}
-               ExpandedComponents(exp)
+               expectedError := fmt.Sprintf("malformed components; %v lacks a 
conforming pipeline component", exp)
+               if _, actualError := ExpandedComponents(exp); 
actualError.Error() != expectedError {
+                       t.Errorf("got error %v, want error %v", actualError, 
expectedError)
+               }
+
        })
 }
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index ec3c552..87dba0c 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -35,7 +35,11 @@ func ResolveArtifacts(ctx context.Context, edges 
[]*graph.MultiEdge, p *pipepb.P
        }
        for _, e := range edges {
                if e.Op == graph.External {
-                       envs := 
graphx.ExpandedComponents(e.External.Expanded).Environments
+                       components, err := 
graphx.ExpandedComponents(e.External.Expanded)
+                       if err != nil {
+                               panic(err)
+                       }
+                       envs := components.Environments
                        for eid, env := range envs {
 
                                if strings.HasPrefix(eid, "go") {
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
index 5a456dc..fb5abef 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
@@ -36,7 +36,10 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p 
*pipepb.Pipeline) {
                        p.Requirements = append(p.Requirements, 
exp.Requirements...)
 
                        // Adding components of the Expanded Transforms to the 
current Pipeline
-                       components := graphx.ExpandedComponents(exp)
+                       components, err := graphx.ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
                        for k, v := range components.GetTransforms() {
                                p.Components.Transforms[k] = v
                        }
@@ -60,8 +63,11 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p 
*pipepb.Pipeline) {
                                p.Components.Environments[k] = v
                        }
 
-                       p.Components.Transforms[id] = 
graphx.ExpandedTransform(exp)
-
+                       transform, err := graphx.ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
                }
        }
 }
@@ -79,7 +85,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p 
*pipepb.Pipeline) {
                        for tag, n := range graphx.ExternalOutputs(e) {
                                nodeID := fmt.Sprintf("n%v", n.ID())
 
-                               expandedOutputs := 
graphx.ExpandedTransform(e.External.Expanded).GetOutputs()
+                               transform, err := 
graphx.ExpandedTransform(e.External.Expanded)
+                               if err != nil {
+                                       panic(err)
+                               }
+                               expandedOutputs := transform.GetOutputs()
                                var pcolID string
                                if tag == graph.SinkOutputTag {
                                        for _, pcolID = range expandedOutputs {
@@ -109,7 +119,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p 
*pipepb.Pipeline) {
 
 // VerifyNamedOutputs ensures the expanded outputs correspond to the correct 
and expected named outputs
 func VerifyNamedOutputs(ext *graph.ExternalTransform) {
-       expandedOutputs := graphx.ExpandedTransform(ext.Expanded).GetOutputs()
+       transform, err := graphx.ExpandedTransform(ext.Expanded)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
 
        if len(expandedOutputs) != len(ext.OutputsMap) {
                panic(errors.Errorf("mismatched number of named 
outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), 
len(ext.OutputsMap)))
@@ -131,8 +145,16 @@ func VerifyNamedOutputs(ext *graph.ExternalTransform) {
 func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater 
func(*graph.Node, bool)) {
        ext := e.External
        exp := ext.Expanded
-       expandedPCollections := graphx.ExpandedComponents(exp).GetPcollections()
-       expandedOutputs := graphx.ExpandedTransform(exp).GetOutputs()
+       components, err := graphx.ExpandedComponents(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedPCollections := components.GetPcollections()
+       transform, err := graphx.ExpandedTransform(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
 
        for tag, node := range graphx.ExternalOutputs(e) {
                var id string
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index f739da7..d830cc5 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -171,8 +171,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        if err != nil {
                return err
        }
-       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
graphx.CreateEnvironment(
-               ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)})
+       enviroment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+       if err != nil {
+               return errors.WithContext(err, "generating model pipeline")
+       }
+       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
enviroment})
        if err != nil {
                return errors.WithContext(err, "generating model pipeline")
        }
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 940f852..fac57b7 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -77,8 +77,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                getEnvCfg = srv.EnvironmentConfig
        }
 
-       pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: 
graphx.CreateEnvironment(
-               ctx, envUrn, getEnvCfg)})
+       enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
+       if err != nil {
+               return errors.WithContextf(err, "generating model pipeline")
+       }
+       pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: 
enviroment})
        if err != nil {
                return errors.WithContextf(err, "generating model pipeline")
        }

Reply via email to