This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b8159f0 [Go SDK] Fix side input window coding, & re-enable tests. new fa3af06 Merge pull request #7591 from lostluck/singlewindow b8159f0 is described below commit b8159f0e803be6866bd46753df82d4282cdab3fb Author: Robert Burke <rob...@frantil.com> AuthorDate: Tue Jan 22 22:04:43 2019 +0000 [Go SDK] Fix side input window coding, & re-enable tests. --- sdks/go/pkg/beam/core/runtime/exec/coder.go | 33 +++++++++++++++++++---------- sdks/go/test/integration/driver.go | 4 ++-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index b3a1491..012a77f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -250,13 +250,14 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) { type WindowEncoder interface { // Encode serializes the given value to the writer. Encode([]typex.Window, io.Writer) error + EncodeSingle(typex.Window, io.Writer) error } // EncodeWindow is a convenience function for encoding a single window into a // byte slice. func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error) { var buf bytes.Buffer - if err := c.Encode([]typex.Window{w}, &buf); err != nil { + if err := c.EncodeSingle(w, &buf); err != nil { return nil, err } return buf.Bytes(), nil @@ -304,6 +305,10 @@ func (*globalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error { return coder.EncodeInt32(1, w) // #windows } +func (*globalWindowEncoder) EncodeSingle(ws typex.Window, w io.Writer) error { + return nil +} + type globalWindowDecoder struct{} func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) { @@ -313,25 +318,31 @@ func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) { type intervalWindowEncoder struct{} -func (*intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error { - // Encoding: upper bound and duration - +func (enc *intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error { if err := coder.EncodeInt32(int32(len(ws)), w); err != nil { // #windows return err } for _, elm := range ws { - iw := elm.(window.IntervalWindow) - if err := coder.EncodeEventTime(iw.End, w); err != nil { - return err - } - duration := iw.End.Milliseconds() - iw.Start.Milliseconds() - if err := coder.EncodeVarUint64(uint64(duration), w); err != nil { - return err + if err := enc.EncodeSingle(elm, w); err != nil { + return nil } } return nil } +func (*intervalWindowEncoder) EncodeSingle(elm typex.Window, w io.Writer) error { + // Encoding: upper bound and duration + iw := elm.(window.IntervalWindow) + if err := coder.EncodeEventTime(iw.End, w); err != nil { + return err + } + duration := iw.End.Milliseconds() - iw.Start.Milliseconds() + if err := coder.EncodeVarUint64(uint64(duration), w); err != nil { + return err + } + return nil +} + type intervalWindowDecoder struct{} func (*intervalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) { diff --git a/sdks/go/test/integration/driver.go b/sdks/go/test/integration/driver.go index 8daac8e..92a5529 100644 --- a/sdks/go/test/integration/driver.go +++ b/sdks/go/test/integration/driver.go @@ -59,8 +59,8 @@ func main() { {"wordcount:memfs", wordcount.WordCount(old_pond, "+Qj8iAnV5BI2A4sbzUbb6Q==", 8)}, {"wordcount:kinglear", wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", "7ZCh5ih9m8IW1w+iS8sRKg==", 4749)}, {"pardo:multioutput", primitives.ParDoMultiOutput()}, - // BEAM-3286: {"pardo:sideinput", primitives.ParDoSideInput()}, - // BEAM-3286: {"pardo:kvsideinput", primitives.ParDoKVSideInput()}, + {"pardo:sideinput", primitives.ParDoSideInput()}, + {"pardo:kvsideinput", primitives.ParDoKVSideInput()}, {"cogbk:cogbk", primitives.CoGBK()}, {"flatten:flatten", primitives.Flatten()}, // {"flatten:dup", primitives.FlattenDup()},