[ 
https://issues.apache.org/jira/browse/BEAM-13803?focusedWorklogId=723360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-723360
 ]

ASF GitHub Bot logged work on BEAM-13803:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Feb/22 01:29
            Start Date: 09/Feb/22 01:29
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #16775:
URL: https://github.com/apache/beam/pull/16775#discussion_r802187655



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makeWindowedCoder() *coder.Coder {
+       vCoder := coder.NewDouble()
+       return coder.NewW(vCoder, coder.NewGlobalWindow())
+}
+
+func makeWindowedKVCoder() *coder.Coder {
+       kCoder := coder.NewString()
+       vCoder := coder.NewDouble()
+       kvCoder := coder.NewKV([]*coder.Coder{kCoder, vCoder})
+       return coder.NewW(kvCoder, coder.NewGlobalWindow())
+}
+
+func TestNewSideInputAdapter(t *testing.T) {
+       tests := []struct {
+               name        string
+               sid         StreamID
+               sideInputID string
+               c           *coder.Coder
+               kc          ElementEncoder
+               ec          ElementDecoder
+       }{
+               {
+                       name:        "KV coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedKVCoder(),
+                       kc:          &stringEncoder{},
+                       ec:          &doubleDecoder{},
+               },
+               {
+                       name:        "V coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedCoder(),
+                       kc:          nil,
+                       ec:          &doubleDecoder{},
+               },
+       }
+       for _, test := range tests {
+               adapter := NewSideInputAdapter(test.sid, test.sideInputID, 
test.c, nil)
+               adapterStruct, ok := adapter.(*sideInputAdapter)
+               if !ok {
+                       t.Errorf("failed to convert interface to 
sideInputAdapter struct in test %v", test)
+               }
+               if got, want := adapterStruct.sid, test.sid; got != want {
+                       t.Errorf("got SID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.sideInputID, test.sideInputID; 
got != want {
+                       t.Errorf("got sideInputID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.c, test.c; got != want {
+                       t.Errorf("got coder %v, want %v", got, want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.kc), 
reflect.TypeOf(test.kc); got != want {
+                       t.Errorf("got ElementEncoder type %v, want %v", got, 
want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.ec), 
reflect.TypeOf(test.ec); got != want {
+                       t.Errorf("got ElementDecoder type %v, want %v", got, 
want)
+               }

Review comment:
       Consider wrapping this in t.Run so that failures will be more 
identifiable to the test case. As it stands, if something failed in two cases, 
it might not be easy to idenfity where they came from.  Further, see 
https://github.com/golang/go/wiki/TestComments#got-before-want and 
https://github.com/golang/go/wiki/TestComments#table-driven-tests-vs-multiple-test-functions
   
   ```suggestion
                t.Run(test.name, func(t *testing.T) {
                                adapter := NewSideInputAdapter(test.sid, 
test.sideInputID, test.c, nil)
                                adapterStruct, ok := adapter.(*sideInputAdapter)
                                if !ok {
                                        t.Errorf("failed to convert interface 
to sideInputAdapter struct in test %v", test)
                                }
                                if got, want := adapterStruct.sid, test.sid; 
got != want {
                                        t.Errorf("got SID %v, want %v", got, 
want)
                                }
                                if got, want := adapterStruct.sideInputID, 
test.sideInputID; got != want {
                                        t.Errorf("got sideInputID %v, want %v", 
got, want)
                                }
                                if got, want := adapterStruct.c, test.c; got != 
want {
                                        t.Errorf("got coder %v, want %v", got, 
want)
                                }
                                if got, want := 
reflect.TypeOf(adapterStruct.kc), reflect.TypeOf(test.kc); got != want {
                                        t.Errorf("got ElementEncoder type %v, 
want %v", got, want)
                                }
                                if got, want := 
reflect.TypeOf(adapterStruct.ec), reflect.TypeOf(test.ec); got != want {
                                        t.Errorf("got ElementDecoder type %v, 
want %v", got, want)
                                }
                })
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -418,7 +379,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
([]string, error) {
 
                                si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
                                        AccessPattern: &pipepb.FunctionSpec{
-                                               Urn: URNMultimapSideInput,
+                                               Urn: URNIterableSideInput,
                                        },
                                        ViewFn: &pipepb.FunctionSpec{
                                                Urn: "foo",

Review comment:
       In this case, it's "required" by some runner implementations to be 
non-empty, if not actually required by explicit proto semantics.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makeWindowedCoder() *coder.Coder {
+       vCoder := coder.NewDouble()
+       return coder.NewW(vCoder, coder.NewGlobalWindow())
+}
+
+func makeWindowedKVCoder() *coder.Coder {
+       kCoder := coder.NewString()
+       vCoder := coder.NewDouble()
+       kvCoder := coder.NewKV([]*coder.Coder{kCoder, vCoder})
+       return coder.NewW(kvCoder, coder.NewGlobalWindow())
+}
+
+func TestNewSideInputAdapter(t *testing.T) {
+       tests := []struct {
+               name        string
+               sid         StreamID
+               sideInputID string
+               c           *coder.Coder
+               kc          ElementEncoder
+               ec          ElementDecoder
+       }{
+               {
+                       name:        "KV coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedKVCoder(),
+                       kc:          &stringEncoder{},
+                       ec:          &doubleDecoder{},
+               },
+               {
+                       name:        "V coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedCoder(),
+                       kc:          nil,
+                       ec:          &doubleDecoder{},
+               },
+       }
+       for _, test := range tests {
+               adapter := NewSideInputAdapter(test.sid, test.sideInputID, 
test.c, nil)
+               adapterStruct, ok := adapter.(*sideInputAdapter)
+               if !ok {
+                       t.Errorf("failed to convert interface to 
sideInputAdapter struct in test %v", test)
+               }
+               if got, want := adapterStruct.sid, test.sid; got != want {
+                       t.Errorf("got SID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.sideInputID, test.sideInputID; 
got != want {
+                       t.Errorf("got sideInputID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.c, test.c; got != want {
+                       t.Errorf("got coder %v, want %v", got, want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.kc), 
reflect.TypeOf(test.kc); got != want {
+                       t.Errorf("got ElementEncoder type %v, want %v", got, 
want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.ec), 
reflect.TypeOf(test.ec); got != want {
+                       t.Errorf("got ElementDecoder type %v, want %v", got, 
want)
+               }
+       }
+}
+
+func TestNewKeyedIterable_Unkeyed(t *testing.T) {
+       adapter := NewSideInputAdapter(StreamID{}, "", makeWindowedCoder(), nil)
+       rs, err := adapter.NewKeyedIterable(context.Background(), nil, nil, nil)
+       if err == nil {
+               t.Error("NewKeyedIterable() succeeded when it should have 
failed")
+       }
+       if rs != nil {
+               t.Errorf("NewKeyedIterable() returned a ReStream when it should 
not have, got %v", rs)

Review comment:
       I don't love a plain constructor only test. Ideally we would be faking 
out the cache or state reader behavior, but that feels like a bit much. We'd 
probably want to have some additional test infrastructure to get easier to use 
Test ReStreams and a Test io.ReadCloser to validate the other half of things. 
Probably better to punt to another PR. (we have some existing utilities in a 
another test file in the exec package.)

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -43,26 +43,71 @@ type sideInputAdapter struct {
        kc          ElementEncoder
        ec          ElementDecoder
        wm          WindowMapper
+       c           *coder.Coder
 }
 
 // NewSideInputAdapter returns a side input adapter for the given StreamID and 
coder.
-// It expects a W<KV<K,V>> coder, because the protocol supports MultiSet 
access only.
+// It expects a W<V> or W<KV<K,V>> coder, because the protocol requires 
windowing information.
 func NewSideInputAdapter(sid StreamID, sideInputID string, c *coder.Coder, wm 
WindowMapper) SideInputAdapter {
-       if !coder.IsW(c) || !coder.IsKV(coder.SkipW(c)) {
-               panic(fmt.Sprintf("expected WKV coder for side input %v: %v", 
sid, c))
+       if !coder.IsW(c) {
+               panic(fmt.Sprintf("expected WV coder for side input %v: %v", 
sid, c))
        }
 
        wc := MakeWindowEncoder(c.Window)
-       kc := MakeElementEncoder(coder.SkipW(c).Components[0])
-       ec := MakeElementDecoder(coder.SkipW(c).Components[1])
-       return &sideInputAdapter{sid: sid, sideInputID: sideInputID, wc: wc, 
kc: kc, ec: ec, wm: wm}
+       var kc ElementEncoder
+       var ec ElementDecoder
+       if coder.IsKV(coder.SkipW(c)) {
+               kc = MakeElementEncoder(coder.SkipW(c).Components[0])
+               ec = MakeElementDecoder(coder.SkipW(c).Components[1])
+       } else {
+               ec = MakeElementDecoder(coder.SkipW(c))

Review comment:
       Good explanation!
   In particular, for MultiMap side inputs, a user requests the values for a 
given Key, which we then encode with the window, and we return out the 
unwindowed values out as the user iterates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 723360)
    Time Spent: 3h 10m  (was: 3h)

> Add native iterable side input support to the Go SDK
> ----------------------------------------------------
>
>                 Key: BEAM-13803
>                 URL: https://issues.apache.org/jira/browse/BEAM-13803
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P2
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The current Go SDK side input implementation handles all side inputs as map 
> side inputs, applying a fixed key to anything that is unkeyed and arranging 
> things into the correct form after the fact. This should be changed to avoid 
> adding an extra fixed key node and support iterable side inputs natively.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to