jrmccluskey commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r750309234



##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
        return p
 }
+
+type stringPair struct {
+       K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+       return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+       {"amy", "[email protected]"},
+       {"james", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"julia", "[email protected]"},
+       {"carl", "[email protected]"},
+       {"james", "[email protected]"},
+}
+
+var phoneSlice = []stringPair{
+       {"amy", "111-222-3333"},
+       {"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+       initial := beam.CreateList(s, input)
+       return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric 
joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+       emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+       phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+       output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: 
emailsKV})
+       passert.Count(s, output, "post-join", 2)
+       amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+       passert.Equals(s, amyOut, "[email protected]", "111-222-3333")
+       passert.Equals(s, jamesOut, "[email protected]", "[email protected]", 
"222-333-4444")
+       return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, 
[]string) {
+       var out string
+       var results []string
+       results = append(results, v)
+       iter := mapSide(k)
+       for iter(&out) {
+               results = append(results, out)
+       }
+       return k, results
+}
+
+func splitByName(key string, vals []string, a, j func(string)) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to