[ 
https://issues.apache.org/jira/browse/BEAM-3293?focusedWorklogId=681751&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-681751
 ]

ASF GitHub Bot logged work on BEAM-3293:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Nov/21 22:58
            Start Date: 15/Nov/21 22:58
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r749736267



##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
        return p
 }
+
+type stringPair struct {
+       K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+       return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+       {"amy", "[email protected]"},
+       {"james", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"julia", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"james", "[email protected]"},
+}
+
+var phoneSlice = []stringPair{
+       {"amy", "111-222-3333"},
+       {"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+       initial := beam.CreateList(s, input)
+       return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern

Review comment:
       ```suggestion
   // ParDoMultiMapSideInput checks that the multimap side input access pattern
   ```

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
        return p
 }
+
+type stringPair struct {
+       K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+       return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+       {"amy", "[email protected]"},
+       {"james", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"julia", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"james", "[email protected]"},
+}
+
+var phoneSlice = []stringPair{
+       {"amy", "111-222-3333"},
+       {"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+       initial := beam.CreateList(s, input)
+       return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric 
joing

Review comment:
       ```suggestion
   // works correctly, properly producing the correct output with an asymmetric 
join.
   ```

##########
File path: sdks/go/test/integration/integration.go
##########
@@ -67,6 +67,8 @@ var directFilters = []string{
        "TestTestStream.*",
        // (BEAM-13075): The direct runner does not support windowed side inputs
        "TestValidateWindowedSideInputs",
+       // The direct runner does not currently support multimap side inputs

Review comment:
       Please add the JIRA for this issue. Likely the work can be consolidated 
w/ the windowed side inputs above, as it'll require similar/identical work.
   
   Scope creep, but very related scope creep.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
        return p
 }
+
+type stringPair struct {
+       K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+       return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+       {"amy", "[email protected]"},
+       {"james", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"julia", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"james", "[email protected]"},
+}
+
+var phoneSlice = []stringPair{
+       {"amy", "111-222-3333"},
+       {"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+       initial := beam.CreateList(s, input)
+       return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric 
joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+       emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+       phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+       output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: 
emailsKV})
+       passert.Count(s, output, "post-join", 2)
+       amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+       passert.Equals(s, amyOut, "[email protected]", "111-222-3333")
+       passert.Equals(s, jamesOut, "[email protected]", "[email protected]", 
"222-333-4444")
+       return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, 
[]string) {
+       var out string
+       var results []string
+       results = append(results, v)
+       iter := mapSide(k)
+       for iter(&out) {
+               results = append(results, out)
+       }
+       return k, results
+}
+
+func splitByName(key string, vals []string, a, j func(string)) {

Review comment:
       Consider adding a 3rd "catch all" output that is used if the keys don't 
match as a default case. Then it can be verified to be empty with 
`passert.Empty`.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
        return p
 }
+
+type stringPair struct {
+       K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+       return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+       {"amy", "[email protected]"},
+       {"james", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"julia", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"james", "[email protected]"},
+}
+
+var phoneSlice = []stringPair{
+       {"amy", "111-222-3333"},
+       {"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+       initial := beam.CreateList(s, input)
+       return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric 
joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+       emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+       phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+       output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: 
emailsKV})
+       passert.Count(s, output, "post-join", 2)
+       amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+       passert.Equals(s, amyOut, "[email protected]", "111-222-3333")
+       passert.Equals(s, jamesOut, "[email protected]", "[email protected]", 
"222-333-4444")
+       return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, 
[]string) {
+       var out string
+       var results []string
+       results = append(results, v)

Review comment:
       ```suggestion
        results := []string{v}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 681751)
    Time Spent: 4h 10m  (was: 4h)

> Add lazy map side input form
> ----------------------------
>
>                 Key: BEAM-3293
>                 URL: https://issues.apache.org/jira/browse/BEAM-3293
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Jack McCluskey
>            Priority: P3
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Add InputKinds LazyMap and LazyMultiMap that allow map lookup without reading 
> everything to memory. They will be accessed through functions such as:
> func(K) func(*V) bool   (a keyed function that returns an iterator)
>  func(K) []V                         (a keyed function that returns a slice 
> of values)
> On the execution layer, the new forms would need to be added to 
> exec/sideinput.go
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go]
>  
>  The inputs layer, for the actual abstraction using reflection:
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go]
> The funcx package would need to be updated to detect the new parameter forms 
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go]
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go]
> as well has the DoFn graph validation code
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566]
> They would need to be correctly translated into the pipeline protos:
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315]
>  and finally back to the newly created handlers in the exec package. 
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402]
> If implemented pre-generics, the code generator frontend, and backend would 
> need to be updated to detect and generate code for efficient no-reflection 
> overhead map access functions. 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go]
>  
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go]
> Unit must be added throughout and Integration tests should be added to verify 
> the functionality against portable beam runners.
>  
> [https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives]
> And of course, the user GoDoc should be updated for the support.
> See this lengthy email response for a more indepth guide to how Side Inputs 
> operate. 
> [https://lists.apache.org/thread.html/ra42dc7ee30842f11740eff33f0afcd63702695878e427127e1268381%40%3Cdev.beam.apache.org%3E]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to