This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this 
push:
     new a4d5648  Update translate.go
a4d5648 is described below

commit a4d5648b4d910066a0470f7c13e60d18f1eac7f4
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:50:51 2020 -0700

    Update translate.go
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go | 200 +++++++++++-----------
 1 file changed, 100 insertions(+), 100 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index f508183..77981a4 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
        "github.com/golang/protobuf/ptypes"
 )
@@ -76,7 +76,7 @@ func goCapabilities() []string {
        return append(capabilities, knownStandardCoders()...)
 }
 
-func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig func(context.Context) string) *pb.Environment {
+func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
        switch urn {
        case "beam:env:process:v1":
                // TODO Support process based SDK Harness.
@@ -85,13 +85,13 @@ func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig
                fallthrough
        default:
                config := extractEnvironmentConfig(ctx)
-               payload := &pb.DockerPayload{ContainerImage: config}
+               payload := &pipepb.DockerPayload{ContainerImage: config}
                serializedPayload, err := proto.Marshal(payload)
                if err != nil {
                        panic(fmt.Sprintf(
                                "Failed to serialize Environment payload %v for 
config %v: %v", payload, config, err))
                }
-               return &pb.Environment{
+               return &pipepb.Environment{
                        Urn:          urn,
                        Payload:      serializedPayload,
                        Capabilities: goCapabilities(),
@@ -104,11 +104,11 @@ func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig
 // Options for marshalling a graph into a model pipeline.
 type Options struct {
        // Environment used to run the user code.
-       Environment *pb.Environment
+       Environment *pipepb.Environment
 }
 
 // Marshal converts a graph to a model pipeline.
-func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) {
+func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) 
{
        tree := NewScopeTree(edges)
 
        m := newMarshaller(opt)
@@ -119,7 +119,7 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) 
(*pb.Pipeline, error) {
                m.addScopeTree(t)
        }
 
-       p := &pb.Pipeline{
+       p := &pipepb.Pipeline{
                Components: m.build(),
        }
        return pipelinex.Normalize(p)
@@ -128,10 +128,10 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) 
(*pb.Pipeline, error) {
 type marshaller struct {
        opt *Options
 
-       transforms   map[string]*pb.PTransform
-       pcollections map[string]*pb.PCollection
-       windowing    map[string]*pb.WindowingStrategy
-       environments map[string]*pb.Environment
+       transforms   map[string]*pipepb.PTransform
+       pcollections map[string]*pipepb.PCollection
+       windowing    map[string]*pipepb.WindowingStrategy
+       environments map[string]*pipepb.Environment
 
        coders *CoderMarshaller
 
@@ -141,17 +141,17 @@ type marshaller struct {
 func newMarshaller(opt *Options) *marshaller {
        return &marshaller{
                opt:          opt,
-               transforms:   make(map[string]*pb.PTransform),
-               pcollections: make(map[string]*pb.PCollection),
-               windowing:    make(map[string]*pb.WindowingStrategy),
-               environments: make(map[string]*pb.Environment),
+               transforms:   make(map[string]*pipepb.PTransform),
+               pcollections: make(map[string]*pipepb.PCollection),
+               windowing:    make(map[string]*pipepb.WindowingStrategy),
+               environments: make(map[string]*pipepb.Environment),
                coders:       NewCoderMarshaller(),
                windowing2id: make(map[string]string),
        }
 }
 
-func (m *marshaller) build() *pb.Components {
-       return &pb.Components{
+func (m *marshaller) build() *pipepb.Components {
+       return &pipepb.Components{
                Transforms:          m.transforms,
                Pcollections:        m.pcollections,
                WindowingStrategies: m.windowing,
@@ -174,7 +174,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
                subtransforms = append(subtransforms, m.addScopeTree(tree))
        }
 
-       transform := &pb.PTransform{
+       transform := &pipepb.PTransform{
                UniqueName:    s.Scope.Name,
                Subtransforms: subtransforms,
        }
@@ -189,7 +189,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for 
runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pipepb.PTransform) {
        if s.Scope.Name != graph.CombinePerKeyScope ||
                len(s.Edges) != 2 ||
                len(s.Edges[0].Edge.Input) != 1 ||
@@ -200,14 +200,14 @@ func (m *marshaller) updateIfCombineComposite(s 
*ScopeTree, transform *pb.PTrans
 
        edge := s.Edges[1].Edge
        acID := m.coders.Add(edge.AccumCoder)
-       payload := &pb.CombinePayload{
-               CombineFn: &pb.FunctionSpec{
+       payload := &pipepb.CombinePayload{
+               CombineFn: &pipepb.FunctionSpec{
                        Urn:     URNJavaDoFn,
                        Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
                },
                AccumulatorCoderId: acID,
        }
-       transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
+       transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
        transform.EnvironmentId = m.addDefaultEnv()
 }
 
@@ -237,15 +237,15 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
[]string {
 
        // allPIds tracks additional PTransformIDs generated for the pipeline
        var allPIds []string
-       var spec *pb.FunctionSpec
+       var spec *pipepb.FunctionSpec
        var transformEnvID = ""
        switch edge.Edge.Op {
        case graph.Impulse:
                // TODO(herohde) 7/18/2018: Encode data?
-               spec = &pb.FunctionSpec{Urn: URNImpulse}
+               spec = &pipepb.FunctionSpec{Urn: URNImpulse}
 
        case graph.ParDo:
-               si := make(map[string]*pb.SideInput)
+               si := make(map[string]*pipepb.SideInput)
                for i, in := range edge.Edge.Input {
                        switch in.Kind {
                        case graph.Main:
@@ -259,8 +259,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                                out := fmt.Sprintf("%v_keyed%v_%v", 
nodeID(in.From), edgeID(edge.Edge), i)
                                m.makeNode(out, 
m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
 
-                               payload := &pb.ParDoPayload{
-                                       DoFn: &pb.FunctionSpec{
+                               payload := &pipepb.ParDoPayload{
+                                       DoFn: &pipepb.FunctionSpec{
                                                Urn: URNIterableSideInputKey,
                                                Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                                        Urn: 
URNIterableSideInputKey,
@@ -269,9 +269,9 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                                }
 
                                keyedID := fmt.Sprintf("%v_keyed%v", 
edgeID(edge.Edge), i)
-                               keyed := &pb.PTransform{
+                               keyed := &pipepb.PTransform{
                                        UniqueName: keyedID,
-                                       Spec: &pb.FunctionSpec{
+                                       Spec: &pipepb.FunctionSpec{
                                                Urn:     URNParDo,
                                                Payload: 
protox.MustEncode(payload),
                                        },
@@ -285,14 +285,14 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
[]string {
                                // Fixup input map
                                inputs[fmt.Sprintf("i%v", i)] = out
 
-                               si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
-                                       AccessPattern: &pb.FunctionSpec{
+                               si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
+                                       AccessPattern: &pipepb.FunctionSpec{
                                                Urn: URNMultimapSideInput,
                                        },
-                                       ViewFn: &pb.FunctionSpec{
+                                       ViewFn: &pipepb.FunctionSpec{
                                                Urn: "foo",
                                        },
-                                       WindowMappingFn: &pb.FunctionSpec{
+                                       WindowMappingFn: &pipepb.FunctionSpec{
                                                Urn: "bar",
                                        },
                                }
@@ -305,8 +305,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                        }
                }
 
-               payload := &pb.ParDoPayload{
-                       DoFn: &pb.FunctionSpec{
+               payload := &pipepb.ParDoPayload{
+                       DoFn: &pipepb.FunctionSpec{
                                Urn:     URNJavaDoFn,
                                Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
                        },
@@ -316,38 +316,38 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
[]string {
                        payload.RestrictionCoderId = 
m.coders.Add(edge.Edge.RestrictionCoder)
                }
                transformEnvID = m.addDefaultEnv()
-               spec = &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
+               spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
 
        case graph.Combine:
-               payload := &pb.ParDoPayload{
+               payload := &pipepb.ParDoPayload{
                        DoFn: &pb.FunctionSpec{
                                Urn:     URNJavaDoFn,
                                Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
                        },
                }
                transformEnvID = m.addDefaultEnv()
-               spec = &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
+               spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
 
        case graph.Flatten:
-               spec = &pb.FunctionSpec{Urn: URNFlatten}
+               spec = &pipepb.FunctionSpec{Urn: URNFlatten}
 
        case graph.CoGBK:
-               spec = &pb.FunctionSpec{Urn: URNGBK}
+               spec = &pipepb.FunctionSpec{Urn: URNGBK}
 
        case graph.WindowInto:
-               payload := &pb.WindowIntoPayload{
+               payload := &pipepb.WindowIntoPayload{
                        WindowFn: makeWindowFn(edge.Edge.WindowFn),
                }
-               spec = &pb.FunctionSpec{Urn: URNWindow, Payload: 
protox.MustEncode(payload)}
+               spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: 
protox.MustEncode(payload)}
 
        case graph.External:
-               spec = &pb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: 
edge.Edge.Payload.Data}
+               spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN, 
Payload: edge.Edge.Payload.Data}
 
        default:
                panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
        }
 
-       transform := &pb.PTransform{
+       transform := &pipepb.PTransform{
                UniqueName:    edge.Name,
                Spec:          spec,
                Inputs:        inputs,
@@ -379,8 +379,8 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                // Inject(i)
 
                injectID := fmt.Sprintf("%v_inject%v", id, i)
-               payload := &pb.ParDoPayload{
-                       DoFn: &pb.FunctionSpec{
+               payload := &pipepb.ParDoPayload{
+                       DoFn: &pipepb.FunctionSpec{
                                Urn: URNInject,
                                Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                        Urn:    URNInject,
@@ -388,9 +388,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                                })),
                        },
                }
-               inject := &pb.PTransform{
+               inject := &pipepb.PTransform{
                        UniqueName: injectID,
-                       Spec: &pb.FunctionSpec{
+                       Spec: &pipepb.FunctionSpec{
                                Urn:     URNParDo,
                                Payload: protox.MustEncode(payload),
                        },
@@ -412,9 +412,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        m.makeNode(out, kvCoderID, outNode)
 
        flattenID := fmt.Sprintf("%v_flatten", id)
-       flatten := &pb.PTransform{
+       flatten := &pipepb.PTransform{
                UniqueName: flattenID,
-               Spec:       &pb.FunctionSpec{Urn: URNFlatten},
+               Spec:       &pipepb.FunctionSpec{Urn: URNFlatten},
                Inputs:     inputs,
                Outputs:    map[string]string{"i0": out},
        }
@@ -427,9 +427,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        m.makeNode(gbkOut, gbkCoderID, outNode)
 
        gbkID := fmt.Sprintf("%v_gbk", id)
-       gbk := &pb.PTransform{
+       gbk := &pipepb.PTransform{
                UniqueName: gbkID,
-               Spec:       &pb.FunctionSpec{Urn: URNGBK},
+               Spec:       &pipepb.FunctionSpec{Urn: URNGBK},
                Inputs:     map[string]string{"i0": out},
                Outputs:    map[string]string{"i0": gbkOut},
        }
@@ -441,17 +441,17 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        m.addNode(outNode)
 
        expandID := fmt.Sprintf("%v_expand", id)
-       payload := &pb.ParDoPayload{
-               DoFn: &pb.FunctionSpec{
+       payload := &pipepb.ParDoPayload{
+               DoFn: &pipepb.FunctionSpec{
                        Urn: URNExpand,
                        Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                Urn: URNExpand,
                        })),
                },
        }
-       expand := &pb.PTransform{
+       expand := &pipepb.PTransform{
                UniqueName: expandID,
-               Spec: &pb.FunctionSpec{
+               Spec: &pipepb.FunctionSpec{
                        Urn:     URNParDo,
                        Payload: protox.MustEncode(payload),
                },
@@ -465,7 +465,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        // Add composite for visualization
 
        cogbkID := fmt.Sprintf("%v_cogbk", id)
-       m.transforms[cogbkID] = &pb.PTransform{
+       m.transforms[cogbkID] = &pipepb.PTransform{
                UniqueName:    edge.Name,
                Subtransforms: subtransforms,
        }
@@ -531,45 +531,45 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
string {
        {
                wfn := window.NewGlobalWindows()
                m.pcollections[postReify].WindowingStrategyId =
-                       m.internWindowingStrategy(&pb.WindowingStrategy{
+                       m.internWindowingStrategy(&pipepb.WindowingStrategy{
                                // Not segregated by time...
                                WindowFn: makeWindowFn(wfn),
                                // ...output after every element is received...
-                               Trigger: &pb.Trigger{
-                                       Trigger: &pb.Trigger_Always_{
-                                               Always: &pb.Trigger_Always{},
+                               Trigger: &pipepb.Trigger{
+                                       Trigger: &pipepb.Trigger_Always_{
+                                               Always: 
&pipepb.Trigger_Always{},
                                        },
                                },
                                // ...and after outputing, discard the output 
elements...
-                               AccumulationMode: 
pb.AccumulationMode_DISCARDING,
+                               AccumulationMode: 
pipepb.AccumulationMode_DISCARDING,
                                // ...and since every pane should have 1 
element,
                                // try to preserve the timestamp.
-                               OutputTime: pb.OutputTime_EARLIEST_IN_PANE,
+                               OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
                                // Defaults copied from 
marshalWindowingStrategy.
                                // TODO(BEAM-3304): migrate to user side 
operations once trigger support is in.
                                EnvironmentId:   m.addDefaultEnv(),
-                               MergeStatus:     pb.MergeStatus_NON_MERGING,
+                               MergeStatus:     pipepb.MergeStatus_NON_MERGING,
                                WindowCoderId:   
m.coders.AddWindowCoder(makeWindowCoder(wfn)),
-                               ClosingBehavior: 
pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+                               ClosingBehavior: 
pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
                                AllowedLateness: 0,
-                               OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
+                               OnTimeBehavior:  
pipepb.OnTimeBehavior_FIRE_ALWAYS,
                        })
        }
 
        // Inputs (i)
 
        inputID := fmt.Sprintf("%v_reifyts", id)
-       payload := &pb.ParDoPayload{
-               DoFn: &pb.FunctionSpec{
+       payload := &pipepb.ParDoPayload{
+               DoFn: &pipepb.FunctionSpec{
                        Urn: URNReshuffleInput,
                        Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                Urn: URNReshuffleInput,
                        })),
                },
        }
-       input := &pb.PTransform{
+       input := &pipepb.PTransform{
                UniqueName: inputID,
-               Spec: &pb.FunctionSpec{
+               Spec: &pipepb.FunctionSpec{
                        Urn:     URNParDo,
                        Payload: protox.MustEncode(payload),
                },
@@ -588,9 +588,9 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string 
{
        m.makeNode(gbkOut, gbkCoderID, outNode)
 
        gbkID := fmt.Sprintf("%v_gbk", id)
-       gbk := &pb.PTransform{
+       gbk := &pipepb.PTransform{
                UniqueName: gbkID,
-               Spec:       &pb.FunctionSpec{Urn: URNGBK},
+               Spec:       &pipepb.FunctionSpec{Urn: URNGBK},
                Inputs:     map[string]string{"i0": postReify},
                Outputs:    map[string]string{"i0": gbkOut},
        }
@@ -603,17 +603,17 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
string {
        m.pcollections[outPCol].WindowingStrategyId = preservedWSId
 
        outputID := fmt.Sprintf("%v_unreify", id)
-       outputPayload := &pb.ParDoPayload{
-               DoFn: &pb.FunctionSpec{
+       outputPayload := &pipepb.ParDoPayload{
+               DoFn: &pipepb.FunctionSpec{
                        Urn: URNReshuffleOutput,
                        Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
                                Urn: URNReshuffleOutput,
                        })),
                },
        }
-       output := &pb.PTransform{
+       output := &pipepb.PTransform{
                UniqueName: outputID,
-               Spec: &pb.FunctionSpec{
+               Spec: &pipepb.FunctionSpec{
                        Urn:     URNParDo,
                        Payload: protox.MustEncode(outputPayload),
                },
@@ -626,10 +626,10 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
string {
 
        // Add composite for visualization, or runner optimization
        reshuffleID := fmt.Sprintf("%v_reshuffle", id)
-       m.transforms[reshuffleID] = &pb.PTransform{
+       m.transforms[reshuffleID] = &pipepb.PTransform{
                UniqueName:    edge.Name,
                Subtransforms: subtransforms,
-               Spec: &pb.FunctionSpec{
+               Spec: &pipepb.FunctionSpec{
                        Urn: URNReshuffle,
                },
        }
@@ -637,7 +637,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string 
{
 }
 
 func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
-       col := &pb.PCollection{
+       col := &pipepb.PCollection{
                UniqueName:          id,
                CoderId:             cid,
                IsBounded:           boolToBounded(n.Bounded()),
@@ -647,11 +647,11 @@ func (m *marshaller) makeNode(id, cid string, n 
*graph.Node) string {
        return id
 }
 
-func boolToBounded(bounded bool) pb.IsBounded_Enum {
+func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
        if bounded {
-               return pb.IsBounded_BOUNDED
+               return pipepb.IsBounded_BOUNDED
        }
-       return pb.IsBounded_UNBOUNDED
+       return pipepb.IsBounded_UNBOUNDED
 }
 
 func (m *marshaller) addDefaultEnv() string {
@@ -668,7 +668,7 @@ func (m *marshaller) addWindowingStrategy(w 
*window.WindowingStrategy) string {
        return m.internWindowingStrategy(ws)
 }
 
-func (m *marshaller) internWindowingStrategy(w *pb.WindowingStrategy) string {
+func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) 
string {
        key := proto.MarshalTextString(w)
        if id, exists := m.windowing2id[key]; exists {
                return id
@@ -682,21 +682,21 @@ func (m *marshaller) internWindowingStrategy(w 
*pb.WindowingStrategy) string {
 
 // marshalWindowingStrategy marshals the given windowing strategy in
 // the given coder context.
-func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
*pb.WindowingStrategy {
-       ws := &pb.WindowingStrategy{
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) 
*pipepb.WindowingStrategy {
+       ws := &pipepb.WindowingStrategy{
                WindowFn:         makeWindowFn(w.Fn),
-               MergeStatus:      pb.MergeStatus_NON_MERGING,
-               AccumulationMode: pb.AccumulationMode_DISCARDING,
+               MergeStatus:      pipepb.MergeStatus_NON_MERGING,
+               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    c.AddWindowCoder(makeWindowCoder(w.Fn)),
-               Trigger: &pb.Trigger{
-                       Trigger: &pb.Trigger_Default_{
-                               Default: &pb.Trigger_Default{},
+               Trigger: &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Default_{
+                               Default: &pipepb.Trigger_Default{},
                        },
                },
-               OutputTime:      pb.OutputTime_END_OF_WINDOW,
-               ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
                AllowedLateness: 0,
-               OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
+               OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
        }
        return ws
 }
@@ -704,33 +704,33 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) *
 func makeWindowFn(w *window.Fn) *pb.FunctionSpec {
        switch w.Kind {
        case window.GlobalWindows:
-               return &pb.FunctionSpec{
+               return &pipepb.FunctionSpec{
                        Urn: URNGlobalWindowsWindowFn,
                }
        case window.FixedWindows:
-               return &pb.FunctionSpec{
+               return &pipepb.FunctionSpec{
                        Urn: URNFixedWindowsWindowFn,
                        Payload: protox.MustEncode(
-                               &pb.FixedWindowsPayload{
+                               &pipepb.FixedWindowsPayload{
                                        Size: ptypes.DurationProto(w.Size),
                                },
                        ),
                }
        case window.SlidingWindows:
-               return &pb.FunctionSpec{
+               return &pipepb.FunctionSpec{
                        Urn: URNSlidingWindowsWindowFn,
                        Payload: protox.MustEncode(
-                               &pb.SlidingWindowsPayload{
+                               &pipepb.SlidingWindowsPayload{
                                        Size:   ptypes.DurationProto(w.Size),
                                        Period: ptypes.DurationProto(w.Period),
                                },
                        ),
                }
        case window.Sessions:
-               return &pb.FunctionSpec{
+               return &pipepb.FunctionSpec{
                        Urn: URNSessionsWindowFn,
                        Payload: protox.MustEncode(
-                               &pb.SessionWindowsPayload{
+                               &pipepb.SessionWindowsPayload{
                                        GapSize: ptypes.DurationProto(w.Gap),
                                },
                        ),

Reply via email to