This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this
push:
new a4d5648 Update translate.go
a4d5648 is described below
commit a4d5648b4d910066a0470f7c13e60d18f1eac7f4
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:50:51 2020 -0700
Update translate.go
---
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 200 +++++++++++-----------
1 file changed, 100 insertions(+), 100 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index f508183..77981a4 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -26,7 +26,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
)
@@ -76,7 +76,7 @@ func goCapabilities() []string {
return append(capabilities, knownStandardCoders()...)
}
-func CreateEnvironment(ctx context.Context, urn string,
extractEnvironmentConfig func(context.Context) string) *pb.Environment {
+func CreateEnvironment(ctx context.Context, urn string,
extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
switch urn {
case "beam:env:process:v1":
// TODO Support process based SDK Harness.
@@ -85,13 +85,13 @@ func CreateEnvironment(ctx context.Context, urn string,
extractEnvironmentConfig
fallthrough
default:
config := extractEnvironmentConfig(ctx)
- payload := &pb.DockerPayload{ContainerImage: config}
+ payload := &pipepb.DockerPayload{ContainerImage: config}
serializedPayload, err := proto.Marshal(payload)
if err != nil {
panic(fmt.Sprintf(
"Failed to serialize Environment payload %v for
config %v: %v", payload, config, err))
}
- return &pb.Environment{
+ return &pipepb.Environment{
Urn: urn,
Payload: serializedPayload,
Capabilities: goCapabilities(),
@@ -104,11 +104,11 @@ func CreateEnvironment(ctx context.Context, urn string,
extractEnvironmentConfig
// Options for marshalling a graph into a model pipeline.
type Options struct {
// Environment used to run the user code.
- Environment *pb.Environment
+ Environment *pipepb.Environment
}
// Marshal converts a graph to a model pipeline.
-func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) {
+func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error)
{
tree := NewScopeTree(edges)
m := newMarshaller(opt)
@@ -119,7 +119,7 @@ func Marshal(edges []*graph.MultiEdge, opt *Options)
(*pb.Pipeline, error) {
m.addScopeTree(t)
}
- p := &pb.Pipeline{
+ p := &pipepb.Pipeline{
Components: m.build(),
}
return pipelinex.Normalize(p)
@@ -128,10 +128,10 @@ func Marshal(edges []*graph.MultiEdge, opt *Options)
(*pb.Pipeline, error) {
type marshaller struct {
opt *Options
- transforms map[string]*pb.PTransform
- pcollections map[string]*pb.PCollection
- windowing map[string]*pb.WindowingStrategy
- environments map[string]*pb.Environment
+ transforms map[string]*pipepb.PTransform
+ pcollections map[string]*pipepb.PCollection
+ windowing map[string]*pipepb.WindowingStrategy
+ environments map[string]*pipepb.Environment
coders *CoderMarshaller
@@ -141,17 +141,17 @@ type marshaller struct {
func newMarshaller(opt *Options) *marshaller {
return &marshaller{
opt: opt,
- transforms: make(map[string]*pb.PTransform),
- pcollections: make(map[string]*pb.PCollection),
- windowing: make(map[string]*pb.WindowingStrategy),
- environments: make(map[string]*pb.Environment),
+ transforms: make(map[string]*pipepb.PTransform),
+ pcollections: make(map[string]*pipepb.PCollection),
+ windowing: make(map[string]*pipepb.WindowingStrategy),
+ environments: make(map[string]*pipepb.Environment),
coders: NewCoderMarshaller(),
windowing2id: make(map[string]string),
}
}
-func (m *marshaller) build() *pb.Components {
- return &pb.Components{
+func (m *marshaller) build() *pipepb.Components {
+ return &pipepb.Components{
Transforms: m.transforms,
Pcollections: m.pcollections,
WindowingStrategies: m.windowing,
@@ -174,7 +174,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
subtransforms = append(subtransforms, m.addScopeTree(tree))
}
- transform := &pb.PTransform{
+ transform := &pipepb.PTransform{
UniqueName: s.Scope.Name,
Subtransforms: subtransforms,
}
@@ -189,7 +189,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
// to be a CombinePerKey with a CombinePayload if it's a liftable composite.
// Beam Portability requires that composites contain an implementation for
runners
// that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform
*pb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform
*pipepb.PTransform) {
if s.Scope.Name != graph.CombinePerKeyScope ||
len(s.Edges) != 2 ||
len(s.Edges[0].Edge.Input) != 1 ||
@@ -200,14 +200,14 @@ func (m *marshaller) updateIfCombineComposite(s
*ScopeTree, transform *pb.PTrans
edge := s.Edges[1].Edge
acID := m.coders.Add(edge.AccumCoder)
- payload := &pb.CombinePayload{
- CombineFn: &pb.FunctionSpec{
+ payload := &pipepb.CombinePayload{
+ CombineFn: &pipepb.FunctionSpec{
Urn: URNJavaDoFn,
Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
},
AccumulatorCoderId: acID,
}
- transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload:
protox.MustEncode(payload)}
+ transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload:
protox.MustEncode(payload)}
transform.EnvironmentId = m.addDefaultEnv()
}
@@ -237,15 +237,15 @@ func (m *marshaller) addMultiEdge(edge NamedEdge)
[]string {
// allPIds tracks additional PTransformIDs generated for the pipeline
var allPIds []string
- var spec *pb.FunctionSpec
+ var spec *pipepb.FunctionSpec
var transformEnvID = ""
switch edge.Edge.Op {
case graph.Impulse:
// TODO(herohde) 7/18/2018: Encode data?
- spec = &pb.FunctionSpec{Urn: URNImpulse}
+ spec = &pipepb.FunctionSpec{Urn: URNImpulse}
case graph.ParDo:
- si := make(map[string]*pb.SideInput)
+ si := make(map[string]*pipepb.SideInput)
for i, in := range edge.Edge.Input {
switch in.Kind {
case graph.Main:
@@ -259,8 +259,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
out := fmt.Sprintf("%v_keyed%v_%v",
nodeID(in.From), edgeID(edge.Edge), i)
m.makeNode(out,
m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
- payload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ payload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNIterableSideInputKey,
Payload:
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
Urn:
URNIterableSideInputKey,
@@ -269,9 +269,9 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
}
keyedID := fmt.Sprintf("%v_keyed%v",
edgeID(edge.Edge), i)
- keyed := &pb.PTransform{
+ keyed := &pipepb.PTransform{
UniqueName: keyedID,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload:
protox.MustEncode(payload),
},
@@ -285,14 +285,14 @@ func (m *marshaller) addMultiEdge(edge NamedEdge)
[]string {
// Fixup input map
inputs[fmt.Sprintf("i%v", i)] = out
- si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
- AccessPattern: &pb.FunctionSpec{
+ si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
+ AccessPattern: &pipepb.FunctionSpec{
Urn: URNMultimapSideInput,
},
- ViewFn: &pb.FunctionSpec{
+ ViewFn: &pipepb.FunctionSpec{
Urn: "foo",
},
- WindowMappingFn: &pb.FunctionSpec{
+ WindowMappingFn: &pipepb.FunctionSpec{
Urn: "bar",
},
}
@@ -305,8 +305,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
}
}
- payload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ payload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNJavaDoFn,
Payload:
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
@@ -316,38 +316,38 @@ func (m *marshaller) addMultiEdge(edge NamedEdge)
[]string {
payload.RestrictionCoderId =
m.coders.Add(edge.Edge.RestrictionCoder)
}
transformEnvID = m.addDefaultEnv()
- spec = &pb.FunctionSpec{Urn: URNParDo, Payload:
protox.MustEncode(payload)}
+ spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload:
protox.MustEncode(payload)}
case graph.Combine:
- payload := &pb.ParDoPayload{
+ payload := &pipepb.ParDoPayload{
DoFn: &pb.FunctionSpec{
Urn: URNJavaDoFn,
Payload:
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
}
transformEnvID = m.addDefaultEnv()
- spec = &pb.FunctionSpec{Urn: URNParDo, Payload:
protox.MustEncode(payload)}
+ spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload:
protox.MustEncode(payload)}
case graph.Flatten:
- spec = &pb.FunctionSpec{Urn: URNFlatten}
+ spec = &pipepb.FunctionSpec{Urn: URNFlatten}
case graph.CoGBK:
- spec = &pb.FunctionSpec{Urn: URNGBK}
+ spec = &pipepb.FunctionSpec{Urn: URNGBK}
case graph.WindowInto:
- payload := &pb.WindowIntoPayload{
+ payload := &pipepb.WindowIntoPayload{
WindowFn: makeWindowFn(edge.Edge.WindowFn),
}
- spec = &pb.FunctionSpec{Urn: URNWindow, Payload:
protox.MustEncode(payload)}
+ spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload:
protox.MustEncode(payload)}
case graph.External:
- spec = &pb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload:
edge.Edge.Payload.Data}
+ spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN,
Payload: edge.Edge.Payload.Data}
default:
panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
}
- transform := &pb.PTransform{
+ transform := &pipepb.PTransform{
UniqueName: edge.Name,
Spec: spec,
Inputs: inputs,
@@ -379,8 +379,8 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// Inject(i)
injectID := fmt.Sprintf("%v_inject%v", id, i)
- payload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ payload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNInject,
Payload:
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
Urn: URNInject,
@@ -388,9 +388,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
})),
},
}
- inject := &pb.PTransform{
+ inject := &pipepb.PTransform{
UniqueName: injectID,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(payload),
},
@@ -412,9 +412,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
m.makeNode(out, kvCoderID, outNode)
flattenID := fmt.Sprintf("%v_flatten", id)
- flatten := &pb.PTransform{
+ flatten := &pipepb.PTransform{
UniqueName: flattenID,
- Spec: &pb.FunctionSpec{Urn: URNFlatten},
+ Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
Inputs: inputs,
Outputs: map[string]string{"i0": out},
}
@@ -427,9 +427,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
m.makeNode(gbkOut, gbkCoderID, outNode)
gbkID := fmt.Sprintf("%v_gbk", id)
- gbk := &pb.PTransform{
+ gbk := &pipepb.PTransform{
UniqueName: gbkID,
- Spec: &pb.FunctionSpec{Urn: URNGBK},
+ Spec: &pipepb.FunctionSpec{Urn: URNGBK},
Inputs: map[string]string{"i0": out},
Outputs: map[string]string{"i0": gbkOut},
}
@@ -441,17 +441,17 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
m.addNode(outNode)
expandID := fmt.Sprintf("%v_expand", id)
- payload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ payload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNExpand,
Payload:
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
Urn: URNExpand,
})),
},
}
- expand := &pb.PTransform{
+ expand := &pipepb.PTransform{
UniqueName: expandID,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(payload),
},
@@ -465,7 +465,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// Add composite for visualization
cogbkID := fmt.Sprintf("%v_cogbk", id)
- m.transforms[cogbkID] = &pb.PTransform{
+ m.transforms[cogbkID] = &pipepb.PTransform{
UniqueName: edge.Name,
Subtransforms: subtransforms,
}
@@ -531,45 +531,45 @@ func (m *marshaller) expandReshuffle(edge NamedEdge)
string {
{
wfn := window.NewGlobalWindows()
m.pcollections[postReify].WindowingStrategyId =
- m.internWindowingStrategy(&pb.WindowingStrategy{
+ m.internWindowingStrategy(&pipepb.WindowingStrategy{
// Not segregated by time...
WindowFn: makeWindowFn(wfn),
// ...output after every element is received...
- Trigger: &pb.Trigger{
- Trigger: &pb.Trigger_Always_{
- Always: &pb.Trigger_Always{},
+ Trigger: &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Always_{
+ Always:
&pipepb.Trigger_Always{},
},
},
// ...and after outputing, discard the output
elements...
- AccumulationMode:
pb.AccumulationMode_DISCARDING,
+ AccumulationMode:
pipepb.AccumulationMode_DISCARDING,
// ...and since every pane should have 1
element,
// try to preserve the timestamp.
- OutputTime: pb.OutputTime_EARLIEST_IN_PANE,
+ OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
// Defaults copied from
marshalWindowingStrategy.
// TODO(BEAM-3304): migrate to user side
operations once trigger support is in.
EnvironmentId: m.addDefaultEnv(),
- MergeStatus: pb.MergeStatus_NON_MERGING,
+ MergeStatus: pipepb.MergeStatus_NON_MERGING,
WindowCoderId:
m.coders.AddWindowCoder(makeWindowCoder(wfn)),
- ClosingBehavior:
pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ ClosingBehavior:
pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
AllowedLateness: 0,
- OnTimeBehavior: pb.OnTimeBehavior_FIRE_ALWAYS,
+ OnTimeBehavior:
pipepb.OnTimeBehavior_FIRE_ALWAYS,
})
}
// Inputs (i)
inputID := fmt.Sprintf("%v_reifyts", id)
- payload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ payload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNReshuffleInput,
Payload:
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
Urn: URNReshuffleInput,
})),
},
}
- input := &pb.PTransform{
+ input := &pipepb.PTransform{
UniqueName: inputID,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(payload),
},
@@ -588,9 +588,9 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string
{
m.makeNode(gbkOut, gbkCoderID, outNode)
gbkID := fmt.Sprintf("%v_gbk", id)
- gbk := &pb.PTransform{
+ gbk := &pipepb.PTransform{
UniqueName: gbkID,
- Spec: &pb.FunctionSpec{Urn: URNGBK},
+ Spec: &pipepb.FunctionSpec{Urn: URNGBK},
Inputs: map[string]string{"i0": postReify},
Outputs: map[string]string{"i0": gbkOut},
}
@@ -603,17 +603,17 @@ func (m *marshaller) expandReshuffle(edge NamedEdge)
string {
m.pcollections[outPCol].WindowingStrategyId = preservedWSId
outputID := fmt.Sprintf("%v_unreify", id)
- outputPayload := &pb.ParDoPayload{
- DoFn: &pb.FunctionSpec{
+ outputPayload := &pipepb.ParDoPayload{
+ DoFn: &pipepb.FunctionSpec{
Urn: URNReshuffleOutput,
Payload:
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
Urn: URNReshuffleOutput,
})),
},
}
- output := &pb.PTransform{
+ output := &pipepb.PTransform{
UniqueName: outputID,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(outputPayload),
},
@@ -626,10 +626,10 @@ func (m *marshaller) expandReshuffle(edge NamedEdge)
string {
// Add composite for visualization, or runner optimization
reshuffleID := fmt.Sprintf("%v_reshuffle", id)
- m.transforms[reshuffleID] = &pb.PTransform{
+ m.transforms[reshuffleID] = &pipepb.PTransform{
UniqueName: edge.Name,
Subtransforms: subtransforms,
- Spec: &pb.FunctionSpec{
+ Spec: &pipepb.FunctionSpec{
Urn: URNReshuffle,
},
}
@@ -637,7 +637,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string
{
}
func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
- col := &pb.PCollection{
+ col := &pipepb.PCollection{
UniqueName: id,
CoderId: cid,
IsBounded: boolToBounded(n.Bounded()),
@@ -647,11 +647,11 @@ func (m *marshaller) makeNode(id, cid string, n
*graph.Node) string {
return id
}
-func boolToBounded(bounded bool) pb.IsBounded_Enum {
+func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
if bounded {
- return pb.IsBounded_BOUNDED
+ return pipepb.IsBounded_BOUNDED
}
- return pb.IsBounded_UNBOUNDED
+ return pipepb.IsBounded_UNBOUNDED
}
func (m *marshaller) addDefaultEnv() string {
@@ -668,7 +668,7 @@ func (m *marshaller) addWindowingStrategy(w
*window.WindowingStrategy) string {
return m.internWindowingStrategy(ws)
}
-func (m *marshaller) internWindowingStrategy(w *pb.WindowingStrategy) string {
+func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy)
string {
key := proto.MarshalTextString(w)
if id, exists := m.windowing2id[key]; exists {
return id
@@ -682,21 +682,21 @@ func (m *marshaller) internWindowingStrategy(w
*pb.WindowingStrategy) string {
// marshalWindowingStrategy marshals the given windowing strategy in
// the given coder context.
-func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy)
*pb.WindowingStrategy {
- ws := &pb.WindowingStrategy{
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy)
*pipepb.WindowingStrategy {
+ ws := &pipepb.WindowingStrategy{
WindowFn: makeWindowFn(w.Fn),
- MergeStatus: pb.MergeStatus_NON_MERGING,
- AccumulationMode: pb.AccumulationMode_DISCARDING,
+ MergeStatus: pipepb.MergeStatus_NON_MERGING,
+ AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: c.AddWindowCoder(makeWindowCoder(w.Fn)),
- Trigger: &pb.Trigger{
- Trigger: &pb.Trigger_Default_{
- Default: &pb.Trigger_Default{},
+ Trigger: &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Default_{
+ Default: &pipepb.Trigger_Default{},
},
},
- OutputTime: pb.OutputTime_END_OF_WINDOW,
- ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ OutputTime: pipepb.OutputTime_END_OF_WINDOW,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
AllowedLateness: 0,
- OnTimeBehavior: pb.OnTimeBehavior_FIRE_ALWAYS,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
}
return ws
}
@@ -704,33 +704,33 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) *
func makeWindowFn(w *window.Fn) *pb.FunctionSpec {
switch w.Kind {
case window.GlobalWindows:
- return &pb.FunctionSpec{
+ return &pipepb.FunctionSpec{
Urn: URNGlobalWindowsWindowFn,
}
case window.FixedWindows:
- return &pb.FunctionSpec{
+ return &pipepb.FunctionSpec{
Urn: URNFixedWindowsWindowFn,
Payload: protox.MustEncode(
- &pb.FixedWindowsPayload{
+ &pipepb.FixedWindowsPayload{
Size: ptypes.DurationProto(w.Size),
},
),
}
case window.SlidingWindows:
- return &pb.FunctionSpec{
+ return &pipepb.FunctionSpec{
Urn: URNSlidingWindowsWindowFn,
Payload: protox.MustEncode(
- &pb.SlidingWindowsPayload{
+ &pipepb.SlidingWindowsPayload{
Size: ptypes.DurationProto(w.Size),
Period: ptypes.DurationProto(w.Period),
},
),
}
case window.Sessions:
- return &pb.FunctionSpec{
+ return &pipepb.FunctionSpec{
Urn: URNSessionsWindowFn,
Payload: protox.MustEncode(
- &pb.SessionWindowsPayload{
+ &pipepb.SessionWindowsPayload{
GapSize: ptypes.DurationProto(w.Gap),
},
),