This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b8159f0  [Go SDK] Fix side input window coding, & re-enable tests.
     new fa3af06  Merge pull request #7591 from lostluck/singlewindow
b8159f0 is described below

commit b8159f0e803be6866bd46753df82d4282cdab3fb
Author: Robert Burke <rob...@frantil.com>
AuthorDate: Tue Jan 22 22:04:43 2019 +0000

    [Go SDK] Fix side input window coding, & re-enable tests.
---
 sdks/go/pkg/beam/core/runtime/exec/coder.go | 33 +++++++++++++++++++----------
 sdks/go/test/integration/driver.go          |  4 ++--
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b3a1491..012a77f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -250,13 +250,14 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, 
error) {
 type WindowEncoder interface {
        // Encode serializes the given value to the writer.
        Encode([]typex.Window, io.Writer) error
+       EncodeSingle(typex.Window, io.Writer) error
 }
 
 // EncodeWindow is a convenience function for encoding a single window into a
 // byte slice.
 func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error) {
        var buf bytes.Buffer
-       if err := c.Encode([]typex.Window{w}, &buf); err != nil {
+       if err := c.EncodeSingle(w, &buf); err != nil {
                return nil, err
        }
        return buf.Bytes(), nil
@@ -304,6 +305,10 @@ func (*globalWindowEncoder) Encode(ws []typex.Window, w 
io.Writer) error {
        return coder.EncodeInt32(1, w) // #windows
 }
 
+func (*globalWindowEncoder) EncodeSingle(ws typex.Window, w io.Writer) error {
+       return nil
+}
+
 type globalWindowDecoder struct{}
 
 func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
@@ -313,25 +318,31 @@ func (*globalWindowDecoder) Decode(r io.Reader) 
([]typex.Window, error) {
 
 type intervalWindowEncoder struct{}
 
-func (*intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
-       // Encoding: upper bound and duration
-
+func (enc *intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error 
{
        if err := coder.EncodeInt32(int32(len(ws)), w); err != nil { // #windows
                return err
        }
        for _, elm := range ws {
-               iw := elm.(window.IntervalWindow)
-               if err := coder.EncodeEventTime(iw.End, w); err != nil {
-                       return err
-               }
-               duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
-               if err := coder.EncodeVarUint64(uint64(duration), w); err != 
nil {
-                       return err
+               if err := enc.EncodeSingle(elm, w); err != nil {
+                       return nil
                }
        }
        return nil
 }
 
+func (*intervalWindowEncoder) EncodeSingle(elm typex.Window, w io.Writer) 
error {
+       // Encoding: upper bound and duration
+       iw := elm.(window.IntervalWindow)
+       if err := coder.EncodeEventTime(iw.End, w); err != nil {
+               return err
+       }
+       duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
+       if err := coder.EncodeVarUint64(uint64(duration), w); err != nil {
+               return err
+       }
+       return nil
+}
+
 type intervalWindowDecoder struct{}
 
 func (*intervalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
diff --git a/sdks/go/test/integration/driver.go 
b/sdks/go/test/integration/driver.go
index 8daac8e..92a5529 100644
--- a/sdks/go/test/integration/driver.go
+++ b/sdks/go/test/integration/driver.go
@@ -59,8 +59,8 @@ func main() {
                {"wordcount:memfs", wordcount.WordCount(old_pond, 
"+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
                {"wordcount:kinglear", 
wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", 
"7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
                {"pardo:multioutput", primitives.ParDoMultiOutput()},
-               // BEAM-3286: {"pardo:sideinput", primitives.ParDoSideInput()},
-               // BEAM-3286: {"pardo:kvsideinput", 
primitives.ParDoKVSideInput()},
+               {"pardo:sideinput", primitives.ParDoSideInput()},
+               {"pardo:kvsideinput", primitives.ParDoKVSideInput()},
                {"cogbk:cogbk", primitives.CoGBK()},
                {"flatten:flatten", primitives.Flatten()},
                // {"flatten:dup", primitives.FlattenDup()},

Reply via email to