[
https://issues.apache.org/jira/browse/BEAM-3866?focusedWorklogId=81941&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81941
]
ASF GitHub Bot logged work on BEAM-3866:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Mar/18 17:46
Start Date: 19/Mar/18 17:46
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #4885: [BEAM-3866] Remove
WindowedValue on PCollections for Go SDK
URL: https://github.com/apache/beam/pull/4885
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/graph/bind.go
b/sdks/go/pkg/beam/core/graph/bind.go
index c27d679c47f..093c738a835 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -27,11 +27,9 @@ import (
// up. We should verify that common mistakes yield reasonable errors.
// Bind returns the inbound, outbound and underlying output types for a Fn,
-// when bound to the underlying input types. All top-level fulltype must be
-// Windowed Values, because transforms always work on windowed values for the
-// main input and all outputs -- even if the transform chooses to ignore it.
-// The complication of bind is primarily that UserFns have loose signatures
-// and bind must produce valid type information for the execution plan.
+// when bound to the underlying input types. The complication of bind is
+// primarily that UserFns have loose signatures and bind must produce valid
+// type information for the execution plan.
//
// For example,
//
@@ -39,26 +37,26 @@ import (
// or
// func (context.Context, k typex.X, v int) (string, typex.X, error)
//
-// are UserFns that may take one or two incoming fulltypes: either W<KV<X,int>>
-// or W<X> with a singleton side input of type W<int>. For the purpose of the
+// are UserFns that may take one or two incoming fulltypes: either KV<X,int>
+// or X with a singleton side input of type int. For the purpose of the
// shape of data processing, the two forms are equivalent. The non-data types,
// context.Context and error, are not part of the data signature, but in play
-// only at runtime. EventTime in the first case exposes Window information.
+// only at runtime.
//
-// If either was bound to the input type [W<KV<string,int>>], bind would
return:
+// If either was bound to the input type [KV<string,int>], bind would return:
//
-// inbound: [Main: W<KV<X,int>>]
-// outbound: [W<KV<string,X>>]
-// output: [W<KV<string,string>>]
+// inbound: [Main: KV<X,int>]
+// outbound: [KV<string,X>]
+// output: [KV<string,string>]
//
// Note that it propagates the assignment of X to string in the output type.
//
-// If either was instead bound to the input fulltypes [W<float>, W<int>], the
+// If either was instead bound to the input fulltypes [float, int], the
// result would be:
//
-// inbound: [Main: W<X>, Singleton: W<int>]
-// outbound: [W<KV<string,X>>]
-// output: [W<KV<string, float>>]
+// inbound: [Main: X, Singleton: int]
+// outbound: [KV<string,X>]
+// output: [KV<string, float>]
//
// Here, the inbound shape and output types are different from before.
func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in
...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType,
[]typex.FullType, error) {
@@ -100,9 +98,9 @@ func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) {
case 0:
break // ok: no direct output.
case 1:
- outbound = append(outbound, typex.NewW(typex.New(ret[0])))
+ outbound = append(outbound, typex.New(ret[0]))
case 2:
- outbound = append(outbound, typex.NewWKV(typex.New(ret[0]),
typex.New(ret[1])))
+ outbound = append(outbound, typex.NewKV(typex.New(ret[0]),
typex.New(ret[1])))
default:
return nil, fmt.Errorf("too many return values: %v", ret)
}
@@ -111,9 +109,9 @@ func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) {
values, _ := funcx.UnfoldEmit(param.T)
trimmed := trimIllegal(values)
if len(trimmed) == 2 {
- outbound = append(outbound,
typex.NewWKV(typex.New(trimmed[0]), typex.New(trimmed[1])))
+ outbound = append(outbound,
typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1])))
} else {
- outbound = append(outbound,
typex.NewW(typex.New(trimmed[0])))
+ outbound = append(outbound, typex.New(trimmed[0]))
}
}
return outbound, nil
@@ -152,10 +150,10 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType)
([]typex.FullType, []InputK
return inbound, kinds, nil
}
-func tryBindInbound(candidate typex.FullType, args []funcx.FnParam, isMain
bool) (typex.FullType, InputKind, error) {
- arity := inboundArity(candidate, isMain)
+func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool)
(typex.FullType, InputKind, error) {
+ arity := inboundArity(t, isMain)
if len(args) < arity {
- return nil, Main, fmt.Errorf("too few parameters to bind %v",
candidate)
+ return nil, Main, fmt.Errorf("too few parameters to bind %v", t)
}
// log.Printf("Bind inbound %v to %v (main: %v)", candidate, args,
isMain)
@@ -163,11 +161,10 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
kind := Main
var other typex.FullType
- t := typex.SkipW(candidate)
switch t.Class() {
case typex.Concrete, typex.Container:
if isMain {
- other = typex.NewW(typex.New(args[0].T))
+ other = typex.New(args[0].T)
} else {
// We accept various forms for side input. We have to
disambiguate
// []string into a Singleton of type []string or a
Slice of type
@@ -180,10 +177,10 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
// TODO(herohde) 6/29/2017: we do not
allow universal slices, for now.
kind = Slice
- other =
typex.NewW(typex.New(args[0].T.Elem()))
+ other = typex.New(args[0].T.Elem())
} else {
kind = Singleton
- other = typex.NewW(typex.New(args[0].T))
+ other = typex.New(args[0].T)
}
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[0].T)
@@ -193,7 +190,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
}
kind = Iter
- other = typex.NewW(typex.New(trimmed[0]))
+ other = typex.New(trimmed[0])
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[0].T)
@@ -203,7 +200,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
}
kind = ReIter
- other = typex.NewW(typex.New(trimmed[0]))
+ other = typex.New(trimmed[0])
default:
panic(fmt.Sprintf("Unexpected param kind: %v",
arg))
@@ -219,7 +216,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
if args[1].Kind != funcx.FnValue {
return nil, kind, fmt.Errorf("value of
%v cannot bind to %v", t, args[1])
}
- other = typex.NewWKV(typex.New(args[0].T),
typex.New(args[1].T))
+ other = typex.NewKV(typex.New(args[0].T),
typex.New(args[1].T))
} else {
// TODO(herohde) 6/29/2017: side input map form.
@@ -232,7 +229,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
}
kind = Iter
- other =
typex.NewWKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
+ other =
typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
case funcx.FnReIter:
values, _ :=
funcx.UnfoldReIter(args[0].T)
@@ -242,7 +239,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
}
kind = ReIter
- other =
typex.NewWKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
+ other =
typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
default:
return nil, kind, fmt.Errorf("%v cannot
bind to %v", t, args[0])
@@ -277,7 +274,7 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
return nil, kind, fmt.Errorf("values of
%v cannot bind to %v", t, args[i])
}
}
- other = typex.NewWCoGBK(components...)
+ other = typex.NewCoGBK(components...)
default:
panic("Unexpected inbound type")
@@ -287,8 +284,8 @@ func tryBindInbound(candidate typex.FullType, args
[]funcx.FnParam, isMain bool)
return nil, kind, fmt.Errorf("unexpected inbound type %v", t)
}
- if !typex.IsStructurallyAssignable(candidate, other) {
- return nil, kind, fmt.Errorf("%v is not assignable to %v",
candidate, other)
+ if !typex.IsStructurallyAssignable(t, other) {
+ return nil, kind, fmt.Errorf("%v is not assignable to %v", t,
other)
}
return other, kind, nil
}
@@ -302,8 +299,6 @@ func inboundArity(t typex.FullType, isMain bool) int {
}
// A KV side input must be a single iterator/map.
return 1
- case typex.WindowedValueType:
- return inboundArity(t.Components()[0], isMain)
case typex.CoGBKType:
return len(t.Components())
default:
diff --git a/sdks/go/pkg/beam/core/graph/bind_test.go
b/sdks/go/pkg/beam/core/graph/bind_test.go
index 6241746711f..01d7c7d6c60 100644
--- a/sdks/go/pkg/beam/core/graph/bind_test.go
+++ b/sdks/go/pkg/beam/core/graph/bind_test.go
@@ -31,109 +31,109 @@ func TestBind(t *testing.T) {
Out []typex.FullType // Outgoing Node type; nil == cannot bind
}{
{ // Direct
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(int) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // Direct w/ KV out
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(int) (int, string) { return 0, "" },
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
},
{ // KV Emitter
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(int, func(int, string)) {},
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
},
{ // Direct with optionals time/error
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(typex.EventTime, int) (typex.EventTime, int,
error) { return typex.EventTime{}, 0, nil },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // Emitter w/ optionals
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(typex.EventTime, int, func(typex.EventTime, int))
error { return nil },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(string) int { return 0 },
nil, // int cannot bind to string
},
{
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(int, int) int { return 0 },
nil, // int cannot bind to int x int
},
{ // Generic
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
func(x typex.X) typex.X { return x },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
func(x typex.X) typex.X { return x },
nil, // structural mismatch
},
{ // Generic swap
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int),
typex.New(reflectx.String))},
func(x typex.X, y typex.Y) (typex.Y, typex.X) { return
y, x },
-
[]typex.FullType{typex.NewWKV(typex.New(reflectx.String),
typex.New(reflectx.Int))},
+
[]typex.FullType{typex.NewKV(typex.New(reflectx.String),
typex.New(reflectx.Int))},
},
{ // Side input (as singletons)
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16)), typex.NewW(typex.New(reflectx.Int32))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32)},
func(int8, int16, int32, func(string, int)) {},
-
[]typex.FullType{typex.NewWKV(typex.New(reflectx.String),
typex.New(reflectx.Int))},
+
[]typex.FullType{typex.NewKV(typex.New(reflectx.String),
typex.New(reflectx.Int))},
},
{ // Side input (as slice and iter)
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16)), typex.NewW(typex.New(reflectx.Int32))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32)},
func(int8, []int16, func(*int32) bool, func(int8,
[]int16)) {},
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16)))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16)))},
},
{ // Generic side input (as iter and re-iter)
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16)), typex.NewW(typex.New(reflectx.Int32))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32)},
func(typex.X, func(*typex.Y) bool, func()
func(*typex.T) bool, func(typex.X, []typex.Y)) {},
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16)))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16)))},
},
{ // Generic side output
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16)), typex.NewW(typex.New(reflectx.Int32))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32)},
func(typex.X, typex.Y, typex.Z, func(typex.X,
[]typex.Y), func(int), func(typex.Z)) {},
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16))),
typex.NewW(typex.New(reflectx.Int)), typex.NewW(typex.New(reflectx.Int32))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int8),
typex.New(reflect.SliceOf(reflectx.Int16))), typex.New(reflectx.Int),
typex.New(reflectx.Int32)},
},
{ // Bind as (K, V) ..
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
func(int8, int16) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // .. bind same input as (V, SI) ..
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16)},
func(int8, int16) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // .. and allow other SI forms ..
- []typex.FullType{typex.NewW(typex.New(reflectx.Int8)),
typex.NewW(typex.New(reflectx.Int16))},
+ []typex.FullType{typex.New(reflectx.Int8),
typex.New(reflectx.Int16)},
func(int8, func(*int16) bool) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // .. which won't work as (K, V).
- []typex.FullType{typex.NewWKV(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
+ []typex.FullType{typex.NewKV(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
func(int8, func(*int16) bool) int { return 0 },
nil,
},
{ // GBK binding
-
[]typex.FullType{typex.NewWCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
+
[]typex.FullType{typex.NewCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16))},
func(int8, func(*int16) bool) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // CoGBK binding
-
[]typex.FullType{typex.NewWCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32))},
+
[]typex.FullType{typex.NewCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16), typex.New(reflectx.Int32))},
func(int8, func(*int16) bool, func(*int32) bool) int {
return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{ // GBK binding with side input
-
[]typex.FullType{typex.NewWCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16)), typex.NewW(typex.New(reflectx.Int32))},
+
[]typex.FullType{typex.NewCoGBK(typex.New(reflectx.Int8),
typex.New(reflectx.Int16)), typex.New(reflectx.Int32)},
func(int8, func(*int16) bool, func(*int32) bool) int {
return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
}
@@ -166,16 +166,16 @@ func TestBindWithTypedefs(t *testing.T) {
Out []typex.FullType // Outgoing Node type; nil == cannot
bind
}{
{ // Typedefs are ignored, if not used
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
map[string]reflect.Type{"X": reflectx.Int},
func(int) int { return 0 },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{
nil,
map[string]reflect.Type{"X": reflectx.Int},
func() typex.X { return nil },
- []typex.FullType{typex.NewW(typex.New(reflectx.Int))},
+ []typex.FullType{typex.New(reflectx.Int)},
},
{
nil,
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 9d05b4081a5..4d9f77ca66b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -214,8 +214,6 @@ func NewVarInt() *Coder {
return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
}
-// Convenience methods to operate through the top-level WindowedValue.
-
// IsW returns true iff the coder is for a WindowedValue.
func IsW(c *Coder) bool {
return c.Kind == WindowedValue
@@ -238,36 +236,34 @@ func NewW(c *Coder, w *window.Window) *Coder {
}
}
-// IsWKV returns true iff the coder is for a WindowedValue key-value pair.
-func IsWKV(c *Coder) bool {
- return IsW(c) && SkipW(c).Kind == KV
+// IsKV returns true iff the coder is for key-value pairs.
+func IsKV(c *Coder) bool {
+ return c.Kind == KV
}
-// NewWKV returns a WindowedValue coder for the window of KV elements.
-func NewWKV(components []*Coder, w *window.Window) *Coder {
+// NewKV returns a coder for key-value pairs.
+func NewKV(components []*Coder) *Coder {
checkCodersNotNil(components)
- c := &Coder{
+ return &Coder{
Kind: KV,
T: typex.New(typex.KVType, Types(components)...),
Components: components,
}
- return NewW(c, w)
}
-// IsWCoGBK returns true iff the coder is for a windowed CoGBK type.
-func IsWCoGBK(c *Coder) bool {
- return IsW(c) && SkipW(c).Kind == CoGBK
+// IsCoGBK returns true iff the coder is for a CoGBK type.
+func IsCoGBK(c *Coder) bool {
+ return c.Kind == CoGBK
}
-// NewWCoGBK returns a WindowedValue coder for the window of CoGBK elements.
-func NewWCoGBK(components []*Coder, w *window.Window) *Coder {
+// NewCoGBK returns a coder for CoGBK elements.
+func NewCoGBK(components []*Coder) *Coder {
checkCodersNotNil(components)
- c := &Coder{
+ return &Coder{
Kind: CoGBK,
T: typex.New(typex.CoGBKType, Types(components)...),
Components: components,
}
- return NewW(c, w)
}
// SkipW returns the data coder used by a WindowedValue, or returns the coder.
This
diff --git a/sdks/go/pkg/beam/core/graph/edge.go
b/sdks/go/pkg/beam/core/graph/edge.go
index ad3b28bb5fa..e460e1af5b1 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -72,7 +72,7 @@ type Inbound struct {
// * Slice: []typex.T
// * Iter: func(*typex.X) bool
//
- // If the input type is W<KV<int,string>>, say, then the options are:
+ // If the input type is KV<int,string>, say, then the options are:
//
// * Main: int, string (as two separate parameters)
// * Map: map[int]string
@@ -101,8 +101,8 @@ type Inbound struct {
//
// func (ctx context.Context, key int, value typex.X) error
//
- // is a generic DoFn that if bound to W<KV<int,string>> would have one
- // Inbound link with type W<KV<int, X>>.
+ // is a generic DoFn that if bound to KV<int,string> would have one
+ // Inbound link with type KV<int, X>.
Type typex.FullType
}
@@ -121,7 +121,7 @@ type Outbound struct {
//
// func (ctx context.Context, emit func (key int, value typex.X))
error
//
- // is a generic DoFn that produces one Outbound link of type
W<KV<int,X>>.
+ // is a generic DoFn that produces one Outbound link of type KV<int,X>.
Type typex.FullType // representation type of data
}
@@ -188,32 +188,32 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node)
(*MultiEdge, error) {
if len(ns) == 0 {
return nil, fmt.Errorf("cogbk needs at least 1 input")
}
- if !typex.IsWKV(ns[0].Type()) {
+ if !typex.IsKV(ns[0].Type()) {
return nil, fmt.Errorf("input type must be KV: %v", ns[0])
}
// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> ->
CoGBK<T,U,..,Z>.
- c := coder.SkipW(ns[0].Coder).Components[0]
+ c := ns[0].Coder.Components[0]
w := ns[0].Window()
- comp := []typex.FullType{c.T, typex.SkipW(ns[0].Type()).Components()[1]}
+ comp := []typex.FullType{c.T, ns[0].Type().Components()[1]}
for i := 1; i < len(ns); i++ {
n := ns[i]
- if !typex.IsWKV(n.Type()) {
+ if !typex.IsKV(n.Type()) {
return nil, fmt.Errorf("input type must be KV: %v", n)
}
- if !coder.SkipW(n.Coder).Components[0].Equals(c) {
- return nil, fmt.Errorf("key coder for %v is %v, want
%v", n, coder.SkipW(n.Coder).Components[0], c)
+ if !n.Coder.Components[0].Equals(c) {
+ return nil, fmt.Errorf("key coder for %v is %v, want
%v", n, n.Coder.Components[0], c)
}
if !w.Equals(n.Window()) {
return nil, fmt.Errorf("mismatched cogbk window types:
%v, want %v", n.Window(), w)
}
- comp = append(comp, typex.SkipW(n.Type()).Components()[1])
+ comp = append(comp, n.Type().Components()[1])
}
- t := typex.NewWCoGBK(comp...)
+ t := typex.NewCoGBK(comp...)
out := g.NewNode(t, w)
// (2) Add CoGBK edge
@@ -243,7 +243,7 @@ func NewFlatten(g *Graph, s *Scope, in []*Node)
(*MultiEdge, error) {
return nil, fmt.Errorf("mismatched flatten window
types: %v, want %v", n.Window(), w)
}
}
- if typex.IsWCoGBK(t) {
+ if typex.IsCoGBK(t) {
return nil, fmt.Errorf("flatten input type cannot be CoGBK:
%v", t)
}
@@ -330,31 +330,30 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in
*Node) (*MultiEdge, error)
inT := in.Type()
- isPerKey := typex.IsWCoGBK(in.Type())
+ isPerKey := typex.IsCoGBK(inT)
if isPerKey {
// For per-key combine, the shape of the inbound type and the
type of the
- // inbound node are different: a node type of W<CoGBK<A,B>>
will become W<B>
- // or W<KV<A,B>>, depending on whether the combineFn is keyed
or not.
+ // inbound node are different: a node type of CoGBK<A,B> will
become B
+ // or KV<A,B>, depending on whether the combineFn is keyed or
not.
// Per-key combines may omit the key in the signature. In such
a case,
// it is ignored for the purpose of binding. The runtime will
later look at
// these types to decide whether to add the key or not.
//
- // However, the outbound type will be W<KV<A,O>> (where O is
the output
+ // However, the outbound type will be KV<A,O> (where O is the
output
// type) regardless of whether the combineFn is keyed or not.
- t := typex.SkipW(in.Type())
- if len(t.Components()) > 2 {
- return nil, fmt.Errorf("combine cannot follow
multi-input CoGBK: %v", t)
+ if len(inT.Components()) > 2 {
+ return nil, fmt.Errorf("combine cannot follow
multi-input CoGBK: %v", inT)
}
if len(synth.Param) == 1 {
- inT = typex.NewW(t.Components()[1]) // Drop implicit
key for binding purposes
+ inT = inT.Components()[1] // Drop implicit key for
binding purposes
} else {
- inT = typex.NewWKV(t.Components()...)
+ inT = typex.NewKV(inT.Components()...)
}
// The runtime always adds the key for the output of per-key
combiners.
- key := t.Components()[0]
+ key := in.Type().Components()[0]
synth.Ret = append([]funcx.ReturnParam{{Kind: funcx.RetValue,
T: key.Type()}}, synth.Ret...)
}
@@ -377,10 +376,10 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in
*Node) (*MultiEdge, error)
// NewImpulse inserts a new Impulse edge into the graph. It must use the
// built-in bytes coder.
func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
- ft := typex.NewW(typex.New(reflectx.ByteSlice))
+ ft := typex.New(reflectx.ByteSlice)
w := window.NewGlobalWindow()
n := g.NewNode(ft, w)
- n.Coder = coder.NewW(coder.NewBytes(), w)
+ n.Coder = coder.NewBytes()
edge := g.NewEdge(s)
edge.Op = Impulse
diff --git a/sdks/go/pkg/beam/core/graph/node.go
b/sdks/go/pkg/beam/core/graph/node.go
index d27aa0e0bf1..a7ad2272f4a 100644
--- a/sdks/go/pkg/beam/core/graph/node.go
+++ b/sdks/go/pkg/beam/core/graph/node.go
@@ -25,12 +25,12 @@ import (
// Node is a typed connector describing the data type and encoding. A node
// may have multiple inbound and outbound connections. The underlying type
-// must be a complete windowed type, i.e., not include any type variables.
+// must be a complete type, i.e., not include any type variables.
type Node struct {
id int
// t is the type of underlying data and cannot change. It must be equal
to
- // the coder type. A node type root would always be a WindowedValue. The
- // type must be bound, i.e., it cannot contain any type variables.
+ // the coder type. The type must be bound, i.e., it cannot contain any
+ // type variables.
t typex.FullType
// Coder defines the data encoding. It can be changed, but must be of
@@ -46,7 +46,7 @@ func (n *Node) ID() int {
return n.id
}
-// Type returns the underlying full type of the data, such as
W<KV<int,string>>.
+// Type returns the underlying full type of the data, such as KV<int,string>.
func (n *Node) Type() typex.FullType {
return n.t
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index e41be11f94f..ce9c630ffc2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -278,8 +278,8 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) {
// TODO(herohde) 4/7/2017: actually handle windows.
-// EncodeWindowedValueHeader uses the supplied coder to serialize a windowed
value header.
-func EncodeWindowedValueHeader(c *coder.Coder, t typex.EventTime, w io.Writer)
error {
+// EncodeWindowedValueHeader serializes a windowed value header.
+func EncodeWindowedValueHeader(t typex.EventTime, w io.Writer) error {
// Encoding: Timestamp, Window, Pane (header) + Element
if (time.Time)(t).IsZero() {
@@ -297,8 +297,8 @@ func EncodeWindowedValueHeader(c *coder.Coder, t
typex.EventTime, w io.Writer) e
return err
}
-// DecodeWindowedValueHeader uses the supplied coder to deserialize a windowed
value header.
-func DecodeWindowedValueHeader(c *coder.Coder, r io.Reader) (typex.EventTime,
error) {
+// DecodeWindowedValueHeader deserializes a windowed value header.
+func DecodeWindowedValueHeader(r io.Reader) (typex.EventTime, error) {
// Encoding: Timestamp, Window, Pane (header) + Element
t, err := coder.DecodeEventTime(r)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index 1cf9ca040e3..5ac65067867 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -37,7 +37,7 @@ func TestCombine(t *testing.T) {
}
g := graph.New()
- in := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
+ in := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
edge, err := graph.NewCombine(g, g.Root(), fn, in)
if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index e9491324160..ebf8ddc2407 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -40,8 +40,7 @@ func (n *DataSink) ID() UnitID {
}
func (n *DataSink) Up(ctx context.Context) error {
- c := coder.SkipW(n.Coder)
- n.enc = MakeElementEncoder(c)
+ n.enc = MakeElementEncoder(coder.SkipW(n.Coder))
return nil
}
@@ -61,8 +60,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value
FullValue, values .
// unit.
var b bytes.Buffer
- c := n.Coder
- if err := EncodeWindowedValueHeader(c, value.Timestamp, &b); err != nil
{
+ if err := EncodeWindowedValueHeader(value.Timestamp, &b); err != nil {
return err
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index ca2e49fa645..ef67ecb085e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -63,14 +63,14 @@ func (n *DataSource) Process(ctx context.Context) error {
}
defer r.Close()
- c := n.Coder
+ c := coder.SkipW(n.Coder)
switch {
- case coder.IsWCoGBK(c):
- ck := MakeElementDecoder(coder.SkipW(c).Components[0])
- cv := MakeElementDecoder(coder.SkipW(c).Components[1])
+ case coder.IsCoGBK(c):
+ ck := MakeElementDecoder(c.Components[0])
+ cv := MakeElementDecoder(c.Components[1])
for {
- t, err := DecodeWindowedValueHeader(c, r)
+ t, err := DecodeWindowedValueHeader(r)
if err != nil {
if err == io.EOF {
return nil
@@ -143,11 +143,11 @@ func (n *DataSource) Process(ctx context.Context) error {
}
default:
- ec := MakeElementDecoder(coder.SkipW(c))
+ ec := MakeElementDecoder(c)
for {
atomic.AddInt64(&n.count, 1)
- t, err := DecodeWindowedValueHeader(c, r)
+ t, err := DecodeWindowedValueHeader(r)
if err != nil {
if err == io.EOF {
return nil
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
index c9daa8355af..ad9a70c8b98 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
@@ -53,11 +53,11 @@ func TestParDo(t *testing.T) {
}
g := graph.New()
- nN := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
- aN := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
- bN := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
- cN := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
- dN := g.NewNode(typex.NewW(typex.New(reflectx.Int)),
window.NewGlobalWindow())
+ nN := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
+ aN := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
+ bN := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
+ cN := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
+ dN := g.NewNode(typex.New(reflectx.Int), window.NewGlobalWindow())
edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN,
cN, dN}, nil)
if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 91aefae642b..291ef3a0d1d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -51,7 +51,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor)
(*Plan, error) {
return nil, fmt.Errorf("expected one output from
DataSource, got %v", transform.GetOutputs())
}
- port, err := unmarshalPort(transform.GetSpec().GetPayload())
+ port, cid, err :=
unmarshalPort(transform.GetSpec().GetPayload())
if err != nil {
return nil, err
}
@@ -65,9 +65,17 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor)
(*Plan, error) {
if err != nil {
return nil, err
}
- u.Coder, err = b.makeCoderForPCollection(pid)
- if err != nil {
- return nil, err
+
+ if cid == "" {
+ u.Coder, err = b.makeCoderForPCollection(pid)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ u.Coder, err = b.coders.Coder(cid) // Expected
to be windowed coder
+ if err != nil {
+ return nil, err
+ }
}
}
@@ -174,7 +182,12 @@ func (b *builder) makeCoderForPCollection(id string)
(*coder.Coder, error) {
if !ok {
return nil, fmt.Errorf("pcollection %v not found", id)
}
- return b.coders.Coder(col.CoderId)
+ c, err := b.coders.Coder(col.CoderId)
+ if err != nil {
+ return nil, err
+ }
+ // TODO(herohde) 3/16/2018: remove potential WindowedValue from
Dataflow.
+ return coder.SkipW(c), nil
}
func (b *builder) makePCollection(id string) (Node, error) {
@@ -309,8 +322,8 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
return nil, err
}
- n.IsPerKey = coder.IsWCoGBK(c)
- n.UsesKey = typex.IsWKV(in[0].Type)
+ n.IsPerKey = coder.IsCoGBK(c)
+ n.UsesKey = typex.IsKV(in[0].Type)
u = n
@@ -323,10 +336,10 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
if err != nil {
return nil, err
}
- if !coder.IsWKV(c) {
+ if !coder.IsKV(c) {
return nil, fmt.Errorf("unexpected inject
coder: %v", c)
}
- u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N),
ValueEncoder: MakeElementEncoder(coder.SkipW(c).Components[1]), Out: out[0]}
+ u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N),
ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
case graphx.URNExpand:
var pid string
@@ -337,12 +350,12 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
if err != nil {
return nil, err
}
- if !coder.IsWCoGBK(c) {
+ if !coder.IsCoGBK(c) {
return nil, fmt.Errorf("unexpected expand
coder: %v", c)
}
var decoders []ElementDecoder
- for _, dc := range coder.SkipW(c).Components[1:] {
+ for _, dc := range c.Components[1:] {
decoders = append(decoders,
MakeElementDecoder(dc))
}
u = &Expand{UID: b.idgen.New(), ValueDecoders:
decoders, Out: out[0]}
@@ -352,7 +365,7 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
}
case urnDataSink:
- port, err := unmarshalPort(payload)
+ port, cid, err := unmarshalPort(payload)
if err != nil {
return nil, err
}
@@ -362,9 +375,16 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
for key, pid := range transform.GetInputs() {
sink.Target = Target{ID: id.to, Name: key}
- sink.Coder, err = b.makeCoderForPCollection(pid)
- if err != nil {
- return nil, err
+ if cid == "" {
+ sink.Coder, err = b.makeCoderForPCollection(pid)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ sink.Coder, err = b.coders.Coder(cid) //
Expected to be windowed coder
+ if err != nil {
+ return nil, err
+ }
}
}
u = sink
@@ -422,12 +442,12 @@ func unmarshalKeyedValues(m map[string]string) []string {
return ret
}
-func unmarshalPort(data []byte) (Port, error) {
+func unmarshalPort(data []byte) (Port, string, error) {
var port fnpb.RemoteGrpcPort
if err := proto.Unmarshal(data, &port); err != nil {
- return Port{}, err
+ return Port{}, "", err
}
return Port{
URL: port.GetApiServiceDescriptor().GetUrl(),
- }, nil
+ }, port.CoderId, nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index a07ac6bf64d..a4d525f2599 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -67,16 +67,16 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
coder.NewW(coder.NewBytes(), window.NewGlobalWindow()),
},
{
- "W<KV<foo,bar>>",
- coder.NewWKV([]*coder.Coder{foo, bar},
window.NewGlobalWindow()),
+ "KV<foo,bar>",
+ coder.NewKV([]*coder.Coder{foo, bar}),
},
{
- "W<CoGBK<foo,bar>>",
- coder.NewWCoGBK([]*coder.Coder{foo, bar},
window.NewGlobalWindow()),
+ "CoGBK<foo,bar>",
+ coder.NewCoGBK([]*coder.Coder{foo, bar}),
},
{
- "W<CoGBK<foo,bar,baz>>",
- coder.NewWCoGBK([]*coder.Coder{foo, bar, baz},
window.NewGlobalWindow()),
+ "CoGBK<foo,bar,baz>",
+ coder.NewCoGBK([]*coder.Coder{foo, bar, baz}),
},
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
index ae6e89425ed..0ec3ba8ac93 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
@@ -69,40 +69,38 @@ const (
URNExpand = "beam:go:transform:expand:v1"
)
-// MakeKVUnionCoder returns W<KV<K,KV<int,[]byte>>> for a given CoGBK.
+// MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
if gbk.Op != graph.CoGBK {
panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
}
from := gbk.Input[0].From
- key := coder.SkipW(from.Coder).Components[0]
- return coder.NewWKV([]*coder.Coder{key, makeUnionCoder()},
from.Window())
+ key := from.Coder.Components[0]
+ return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
}
-// MakeGBKUnionCoder returns W<CoGBK<K,KV<int,[]byte>>> for a given CoGBK.
+// MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
if gbk.Op != graph.CoGBK {
panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
}
from := gbk.Input[0].From
- key := coder.SkipW(from.Coder).Components[0]
- return coder.NewWCoGBK([]*coder.Coder{key, makeUnionCoder()},
from.Window())
+ key := from.Coder.Components[0]
+ return coder.NewCoGBK([]*coder.Coder{key, makeUnionCoder()})
}
// makeUnionCoder returns a coder for the raw union value, KV<int,[]byte>. It
uses
// varintz instead of the built-in varint to avoid the implicit
length-prefixing
// of varint otherwise introduced by Dataflow.
func makeUnionCoder() *coder.Coder {
- t := typex.New(typex.KVType, typex.New(reflectx.Int),
typex.New(reflectx.ByteSlice))
-
c, err := coderx.NewVarIntZ(reflectx.Int)
if err != nil {
panic(err)
}
- return &coder.Coder{Kind: coder.KV, T: t, Components: []*coder.Coder{
+ return coder.NewKV([]*coder.Coder{
{Kind: coder.Custom, T: typex.New(reflectx.Int), Custom: c},
coder.NewBytes(),
- }}
+ })
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index 722ed184abe..b362c087fe5 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -59,11 +59,11 @@ func pick(t *testing.T, g *graph.Graph) *graph.MultiEdge {
}
func intT() typex.FullType {
- return typex.NewW(typex.New(reflectx.Int))
+ return typex.New(reflectx.Int)
}
func intCoder() *coder.Coder {
- return coder.NewW(custom("int", reflectx.Int), window.NewGlobalWindow())
+ return custom("int", reflectx.Int)
}
// TestParDo verifies that ParDo can be serialized.
diff --git a/sdks/go/pkg/beam/core/typex/class.go
b/sdks/go/pkg/beam/core/typex/class.go
index fb9588c3782..8dfa79fbd54 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -163,7 +163,7 @@ func IsUniversal(t reflect.Type) bool {
}
// IsComposite returns true iff the given type is one of the predefined
-// Composite marker types: KV, GBK, CoGBK or WindowedValue.
+// Composite marker types: KV, CoGBK or WindowedValue.
func IsComposite(t reflect.Type) bool {
switch t {
case KVType, CoGBKType, WindowedValueType:
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go
b/sdks/go/pkg/beam/core/typex/fulltype.go
index 2b843762f80..a6ab4a3434a 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -157,37 +157,44 @@ func isAnyNonKVComposite(list []FullType) bool {
return false
}
-// Convenience methods to operate through the top-level WindowedValue.
+// Convenience functions.
+// IsW returns true iff the type is a WindowedValue.
func IsW(t FullType) bool {
return t.Type() == WindowedValueType
}
+// NewW constructs a new WindowedValue of the given type.
func NewW(t FullType) FullType {
return New(WindowedValueType, t)
}
-func IsWKV(t FullType) bool {
- return IsW(t) && SkipW(t).Type() == KVType
+// SkipW skips a WindowedValue layer, if present. If no, returns the input.
+func SkipW(t FullType) FullType {
+ if t.Type() == WindowedValueType {
+ return t.Components()[0]
+ }
+ return t
}
-func NewWKV(components ...FullType) FullType {
- return NewW(New(KVType, components...))
+// IsKV returns true iff the type is a KV.
+func IsKV(t FullType) bool {
+ return t.Type() == KVType
}
-func IsWCoGBK(t FullType) bool {
- return IsW(t) && SkipW(t).Type() == CoGBKType
+// NewKV constructs a new KV of the given key and value types.
+func NewKV(components ...FullType) FullType {
+ return New(KVType, components...)
}
-func NewWCoGBK(components ...FullType) FullType {
- return NewW(New(CoGBKType, components...))
+// IsCoGBK returns true iff the type is a CoGBK.
+func IsCoGBK(t FullType) bool {
+ return t.Type() == CoGBKType
}
-func SkipW(t FullType) FullType {
- if t.Type() == WindowedValueType {
- return t.Components()[0]
- }
- return t
+// NewCoGBK constructs a new CoGBK of the given component types.
+func NewCoGBK(components ...FullType) FullType {
+ return New(CoGBKType, components...)
}
// IsStructurallyAssignable returns true iff a from value is structurally
diff --git a/sdks/go/pkg/beam/core/typex/fulltype_test.go
b/sdks/go/pkg/beam/core/typex/fulltype_test.go
index a0eb0d769ef..c27befeff96 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype_test.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype_test.go
@@ -29,11 +29,11 @@ func TestIsBound(t *testing.T) {
}{
{New(reflectx.Int), true},
{New(TType), false},
- {NewWCoGBK(New(TType), New(reflectx.String)), false},
- {NewWCoGBK(New(reflectx.String), New(reflectx.String)), true},
- {NewWKV(New(reflectx.String),
New(reflect.SliceOf(reflectx.Int))), true},
- {NewWKV(New(reflectx.String), New(reflect.SliceOf(XType))),
false},
- {NewWKV(New(reflectx.String), New(reflectx.String)), true},
+ {NewCoGBK(New(TType), New(reflectx.String)), false},
+ {NewCoGBK(New(reflectx.String), New(reflectx.String)), true},
+ {NewKV(New(reflectx.String),
New(reflect.SliceOf(reflectx.Int))), true},
+ {NewKV(New(reflectx.String), New(reflect.SliceOf(XType))),
false},
+ {NewKV(New(reflectx.String), New(reflectx.String)), true},
}
for _, test := range tests {
@@ -53,13 +53,13 @@ func TestIsStructurallyAssignable(t *testing.T) {
{New(reflectx.Int64), New(reflectx.Int32), false}, // from Go
assignability
{New(reflectx.Int), New(TType), true},
{New(XType), New(TType), true},
- {NewWKV(New(XType), New(YType)), New(TType), false},
// T cannot match composites
- {NewWKV(New(reflectx.Int), New(reflectx.Int)),
NewWCoGBK(New(reflectx.Int), New(reflectx.Int)), false}, // structural mismatch
- {NewWKV(New(XType), New(reflectx.Int)), NewWKV(New(TType),
New(UType)), true},
- {NewWKV(New(XType), New(reflectx.Int)), NewWKV(New(TType),
New(XType)), true},
- {NewWKV(New(reflectx.String), New(reflectx.Int)),
NewWKV(New(TType), New(TType)), true},
- {NewWKV(New(reflectx.Int), New(reflectx.Int)),
NewWKV(New(TType), New(TType)), true},
- {NewWKV(New(reflectx.Int), New(reflectx.String)),
NewWKV(New(TType), New(reflectx.String)), true},
+ {NewKV(New(XType), New(YType)), New(TType), false},
// T cannot match composites
+ {NewKV(New(reflectx.Int), New(reflectx.Int)),
NewCoGBK(New(reflectx.Int), New(reflectx.Int)), false}, // structural mismatch
+ {NewKV(New(XType), New(reflectx.Int)), NewKV(New(TType),
New(UType)), true},
+ {NewKV(New(XType), New(reflectx.Int)), NewKV(New(TType),
New(XType)), true},
+ {NewKV(New(reflectx.String), New(reflectx.Int)),
NewKV(New(TType), New(TType)), true},
+ {NewKV(New(reflectx.Int), New(reflectx.Int)), NewKV(New(TType),
New(TType)), true},
+ {NewKV(New(reflectx.Int), New(reflectx.String)),
NewKV(New(TType), New(reflectx.String)), true},
}
for _, test := range tests {
@@ -76,32 +76,32 @@ func TestBindSubstitute(t *testing.T) {
{
New(reflectx.String),
New(XType),
- NewWKV(New(reflectx.Int), New(XType)),
- NewWKV(New(reflectx.Int), New(reflectx.String)),
+ NewKV(New(reflectx.Int), New(XType)),
+ NewKV(New(reflectx.Int), New(reflectx.String)),
},
{
New(reflectx.String),
New(XType),
- NewWKV(New(reflectx.Int), New(reflectx.Int)),
- NewWKV(New(reflectx.Int), New(reflectx.Int)),
+ NewKV(New(reflectx.Int), New(reflectx.Int)),
+ NewKV(New(reflectx.Int), New(reflectx.Int)),
},
{
- NewWKV(New(reflectx.Int), New(reflectx.String)),
- NewWKV(New(XType), New(YType)),
- NewWCoGBK(New(XType), New(XType)),
- NewWCoGBK(New(reflectx.Int), New(reflectx.Int)),
+ NewKV(New(reflectx.Int), New(reflectx.String)),
+ NewKV(New(XType), New(YType)),
+ NewCoGBK(New(XType), New(XType)),
+ NewCoGBK(New(reflectx.Int), New(reflectx.Int)),
},
{
- NewWCoGBK(New(reflectx.Int), New(reflectx.String)),
- NewWCoGBK(New(XType), New(YType)),
- NewWCoGBK(New(YType), New(XType)),
- NewWCoGBK(New(reflectx.String), New(reflectx.Int)),
+ NewCoGBK(New(reflectx.Int), New(reflectx.String)),
+ NewCoGBK(New(XType), New(YType)),
+ NewCoGBK(New(YType), New(XType)),
+ NewCoGBK(New(reflectx.String), New(reflectx.Int)),
},
{
- NewWCoGBK(New(ZType), New(XType)),
- NewWCoGBK(New(XType), New(YType)),
- NewWCoGBK(New(YType), New(XType)),
- NewWCoGBK(New(XType), New(ZType)),
+ NewCoGBK(New(ZType), New(XType)),
+ NewCoGBK(New(XType), New(YType)),
+ NewCoGBK(New(YType), New(XType)),
+ NewCoGBK(New(XType), New(ZType)),
},
}
diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
index 07e315a8b64..ef2b553b33e 100644
--- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
@@ -27,7 +27,6 @@ import (
"cloud.google.com/go/bigquery"
"github.com/apache/beam/sdks/go/pkg/beam"
- "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
@@ -163,7 +162,7 @@ func mustParseTable(table string) QualifiedTableName {
// Write writes the elements of the given PCollection<T> to bigquery. T is
required
// to be the schema type.
func Write(s beam.Scope, project, table string, col beam.PCollection) {
- t := typex.SkipW(col.Type()).Type()
+ t := col.Type().Type()
mustInferSchema(t)
qn := mustParseTable(table)
diff --git a/sdks/go/pkg/beam/partition.go b/sdks/go/pkg/beam/partition.go
index 06ce7336825..d9580485656 100644
--- a/sdks/go/pkg/beam/partition.go
+++ b/sdks/go/pkg/beam/partition.go
@@ -23,7 +23,6 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
- "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
@@ -44,7 +43,7 @@ func Partition(s Scope, n int, fn interface{}, col
PCollection) []PCollection {
if n < 1 {
panic(fmt.Sprintf("n must be > 0"))
}
- t := typex.SkipW(col.Type()).Type()
+ t := col.Type().Type()
funcx.MustSatisfy(fn, funcx.Replace(sig, TType, t))
// The partitionFn is a DoFn with a signature that is dependent on the
input, so
diff --git a/sdks/go/pkg/beam/pcollection.go b/sdks/go/pkg/beam/pcollection.go
index de2d56bdfe1..a4d85e2479b 100644
--- a/sdks/go/pkg/beam/pcollection.go
+++ b/sdks/go/pkg/beam/pcollection.go
@@ -23,12 +23,12 @@ import (
)
// PCollection is an immutable collection of values of type 'A', which must be
-// a concrete Windowed Value type, such as W<int> or W<KV<int,string>>. A
-// PCollection can contain either a bounded or unbounded number of elements.
-// Bounded and unbounded PCollections are produced as the output of PTransforms
-// (including root PTransforms like textio.Read), and can be passed as the
-// inputs of other PTransforms. Some root transforms produce bounded
-// PCollections and others produce unbounded ones.
+// a concrete type, such as int or KV<int,string>. A PCollection can contain
+// either a bounded or unbounded number of elements. Bounded and unbounded
+// PCollections are produced as the output of PTransforms (including root
+// PTransforms like textio.Read), and can be passed as the inputs of other
+// PTransforms. Some root transforms produce bounded PCollections and others
+// produce unbounded ones.
//
// Each element in a PCollection has an associated timestamp. Sources assign
// timestamps to elements when they create PCollections, and other PTransforms
@@ -53,7 +53,7 @@ func (p PCollection) IsValid() bool {
// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
// Type returns the full type 'A' of the elements. 'A' must be a concrete
-// Windowed Value type, such as W<int> or W<KV<int,string>>.
+// type, such as int or KV<int,string>.
func (p PCollection) Type() FullType {
if !p.IsValid() {
panic("Invalid PCollection")
diff --git a/sdks/go/pkg/beam/runners/dataflow/translate.go
b/sdks/go/pkg/beam/runners/dataflow/translate.go
index 17948e080e5..c2d4f88a11c 100644
--- a/sdks/go/pkg/beam/runners/dataflow/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/translate.go
@@ -32,6 +32,7 @@ import (
rnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
df "google.golang.org/api/dataflow/v1b3"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
)
const (
@@ -90,7 +91,7 @@ func translate(edges []*graph.MultiEdge) ([]*df.Step, error) {
// be before the present one.
ref := nodes[edge.Input[i].From.ID()]
- c, err :=
graphx.EncodeCoderRef(edge.Input[i].From.Coder)
+ c, err :=
encodeCoderRef(edge.Input[i].From.Coder)
if err != nil {
return nil, err
}
@@ -116,7 +117,7 @@ func translate(edges []*graph.MultiEdge) ([]*df.Step,
error) {
for _, out := range edge.Output {
ref := nodes[out.To.ID()]
- coder, err := graphx.EncodeCoderRef(out.To.Coder)
+ coder, err := encodeCoderRef(out.To.Coder)
if err != nil {
return nil, err
}
@@ -131,7 +132,7 @@ func translate(edges []*graph.MultiEdge) ([]*df.Step,
error) {
// Dataflow seems to require at least one output. We
insert
// a bogus one (named "bogus") and remove it in the
harness.
- coder, err :=
graphx.EncodeCoderRef(edge.Input[0].From.Coder)
+ coder, err := encodeCoderRef(edge.Input[0].From.Coder)
if err != nil {
return nil, err
}
@@ -158,11 +159,11 @@ func expandCoGBK(nodes map[int]*outputReference, edge
*graph.MultiEdge) ([]*df.S
// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have
to translate
// CoGBK with multiple PCollections as described in graphx/cogbk.go.
- kvCoder, err := graphx.EncodeCoderRef(graphx.MakeKVUnionCoder(edge))
+ kvCoder, err := encodeCoderRef(graphx.MakeKVUnionCoder(edge))
if err != nil {
return nil, err
}
- gbkCoder, err := graphx.EncodeCoderRef(graphx.MakeGBKUnionCoder(edge))
+ gbkCoder, err := encodeCoderRef(graphx.MakeGBKUnionCoder(edge))
if err != nil {
return nil, err
}
@@ -249,7 +250,7 @@ func expandCoGBK(nodes map[int]*outputReference, edge
*graph.MultiEdge) ([]*df.S
// Expand
ref := nodes[edge.Output[0].To.ID()]
- coder, err := graphx.EncodeCoderRef(edge.Output[0].To.Coder)
+ coder, err := encodeCoderRef(edge.Output[0].To.Coder)
if err != nil {
return nil, err
}
@@ -303,13 +304,11 @@ func translateNodes(edges []*graph.MultiEdge)
map[int]*outputReference {
func translateEdge(edge *graph.MultiEdge) (string, properties, error) {
switch edge.Op {
case graph.Impulse:
- c := edge.Output[0].To.Coder
-
// NOTE: The impulse []data value is encoded in a special way
as a
// URL Query-escaped windowed _unnested_ value. It is read back
in
// a nested context at runtime.
var buf bytes.Buffer
- if err := exec.EncodeWindowedValueHeader(c,
beam.EventTime(time.Now()), &buf); err != nil {
+ if err :=
exec.EncodeWindowedValueHeader(beam.EventTime(time.Now()), &buf); err != nil {
return "", properties{}, err
}
value := string(append(buf.Bytes(), edge.Value...))
@@ -374,6 +373,11 @@ func makeSerializedFnPayload(payload *v1.TransformPayload)
string {
return protox.MustEncodeBase64(payload)
}
+func encodeCoderRef(c *coder.Coder) (*graphx.CoderRef, error) {
+ // TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
+ return graphx.EncodeCoderRef(coder.NewW(c, window.NewGlobalWindow()))
+}
+
// buildName computes a Dataflow composite name understood by the Dataflow UI,
// determined by the scope nesting. Dataflow simply uses "/" to separate
// composite transforms, so we must remove them from the otherwise qualified
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go
b/sdks/go/pkg/beam/runners/direct/direct.go
index fb207dfe95a..78945daa0cc 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -231,8 +231,8 @@ func (b *builder) makeLink(id linkID) (exec.Node, error) {
return b.links[id], nil
case graph.Combine:
- isPerKey := typex.IsWCoGBK(edge.Input[0].From.Type())
- usesKey := typex.IsWKV(edge.Input[0].Type)
+ isPerKey := typex.IsCoGBK(edge.Input[0].From.Type())
+ usesKey := typex.IsKV(edge.Input[0].Type)
u = &exec.Combine{UID: b.idgen.New(), Fn: edge.CombineFn,
IsPerKey: isPerKey, UsesKey: usesKey, Out: out[0]}
diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go
b/sdks/go/pkg/beam/runners/direct/gbk.go
index 909e02a1321..922dcac7568 100644
--- a/sdks/go/pkg/beam/runners/direct/gbk.go
+++ b/sdks/go/pkg/beam/runners/direct/gbk.go
@@ -21,7 +21,6 @@ import (
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
- "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
)
@@ -45,7 +44,7 @@ func (n *CoGBK) ID() exec.UnitID {
}
func (n *CoGBK) Up(ctx context.Context) error {
- n.enc =
exec.MakeElementEncoder(coder.SkipW(n.Edge.Input[0].From.Coder).Components[0])
+ n.enc =
exec.MakeElementEncoder(n.Edge.Input[0].From.Coder.Components[0])
n.m = make(map[string]*group)
return nil
}
diff --git a/sdks/go/pkg/beam/testing/passert/passert.go
b/sdks/go/pkg/beam/testing/passert/passert.go
index 925d655cea0..400a0a66544 100644
--- a/sdks/go/pkg/beam/testing/passert/passert.go
+++ b/sdks/go/pkg/beam/testing/passert/passert.go
@@ -79,7 +79,7 @@ type diffFn struct {
}
func (f *diffFn) ProcessElement(_ []byte, ls, rs func(*beam.T) bool, left,
both, right func(t beam.T)) error {
- c := coder.SkipW(beam.UnwrapCoder(f.Coder.Coder))
+ c := beam.UnwrapCoder(f.Coder.Coder)
indexL, err := index(c, ls)
if err != nil {
@@ -178,10 +178,10 @@ func Empty(s beam.Scope, col beam.PCollection)
beam.PCollection {
func fail(s beam.Scope, col beam.PCollection, format string) {
switch {
- case typex.IsWKV(col.Type()):
+ case typex.IsKV(col.Type()):
beam.ParDo0(s, &failKVFn{Format: format}, col)
- case typex.IsWCoGBK(col.Type()):
+ case typex.IsCoGBK(col.Type()):
beam.ParDo0(s, &failGBKFn{Format: format}, col)
default:
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go
b/sdks/go/pkg/beam/transforms/filter/filter.go
index be27787649e..d9eb1797e29 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -22,7 +22,6 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
- "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
@@ -48,9 +47,7 @@ func init() {
func Include(s beam.Scope, col beam.PCollection, fn interface{})
beam.PCollection {
s = s.Scope("filter.Include")
- t := typex.SkipW(col.Type()).Type()
- funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, t))
-
+ funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type()))
return beam.ParDo(s, &filterFn{Predicate: beam.EncodedFunc{Fn:
reflectx.MakeFunc(fn)}, Include: true}, col)
}
@@ -68,9 +65,7 @@ func Include(s beam.Scope, col beam.PCollection, fn
interface{}) beam.PCollectio
func Exclude(s beam.Scope, col beam.PCollection, fn interface{})
beam.PCollection {
s = s.Scope("filter.Exclude")
- t := typex.SkipW(col.Type()).Type()
- funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, t))
-
+ funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type()))
return beam.ParDo(s, &filterFn{Predicate: beam.EncodedFunc{Fn:
reflectx.MakeFunc(fn)}, Include: false}, col)
}
diff --git a/sdks/go/pkg/beam/validate.go b/sdks/go/pkg/beam/validate.go
index f08e5d0a3aa..38b9b537889 100644
--- a/sdks/go/pkg/beam/validate.go
+++ b/sdks/go/pkg/beam/validate.go
@@ -25,17 +25,17 @@ import (
// ValidateKVType panics if the type of the PCollection is not KV<A,B>.
// It returns (A,B).
func ValidateKVType(col PCollection) (typex.FullType, typex.FullType) {
- if !typex.IsWKV(col.Type()) {
+ t := col.Type()
+ if !typex.IsKV(t) {
panic(fmt.Sprintf("pcollection must be of KV type: %v", col))
}
- t := typex.SkipW(col.Type())
return t.Components()[0], t.Components()[1]
}
// ValidateConcreteType panics if the type of the PCollection is not a
// composite type. It returns the type.
func ValidateNonCompositeType(col PCollection) typex.FullType {
- t := typex.SkipW(col.Type())
+ t := col.Type()
if typex.IsComposite(t.Type()) {
panic(fmt.Sprintf("pcollection must be of non-composite type:
%v", col))
}
diff --git a/sdks/go/pkg/beam/x/debug/head.go b/sdks/go/pkg/beam/x/debug/head.go
index 91302367a29..b4c68a1a218 100644
--- a/sdks/go/pkg/beam/x/debug/head.go
+++ b/sdks/go/pkg/beam/x/debug/head.go
@@ -33,7 +33,7 @@ func Head(s beam.Scope, col beam.PCollection, n int)
beam.PCollection {
s = s.Scope("debug.Head")
switch {
- case typex.IsWKV(col.Type()):
+ case typex.IsKV(col.Type()):
return beam.ParDo(s, &headKVFn{N: n}, beam.Impulse(s),
beam.SideInput{Input: col})
default:
return beam.ParDo(s, &headFn{N: n}, beam.Impulse(s),
beam.SideInput{Input: col})
diff --git a/sdks/go/pkg/beam/x/debug/print.go
b/sdks/go/pkg/beam/x/debug/print.go
index 57d49d4e33c..57146d059b3 100644
--- a/sdks/go/pkg/beam/x/debug/print.go
+++ b/sdks/go/pkg/beam/x/debug/print.go
@@ -43,9 +43,9 @@ func Printf(s beam.Scope, format string, col
beam.PCollection) beam.PCollection
s = s.Scope("debug.Print")
switch {
- case typex.IsWKV(col.Type()):
+ case typex.IsKV(col.Type()):
return beam.ParDo(s, &printKVFn{Format: format}, col)
- case typex.IsWCoGBK(col.Type()):
+ case typex.IsCoGBK(col.Type()):
return beam.ParDo(s, &printGBKFn{Format: format}, col)
default:
return beam.ParDo(s, &printFn{Format: format}, col)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 81941)
Time Spent: 1h (was: 50m)
> Move Go SDK to not use WindowedValue for PCollections
> -----------------------------------------------------
>
> Key: BEAM-3866
> URL: https://issues.apache.org/jira/browse/BEAM-3866
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Henning Rohde
> Priority: Major
> Labels: portability
> Time Spent: 1h
> Remaining Estimate: 0h
>
> The windowing information is part of the gRPC instructions. Dataflow still
> expects the old way.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)