[ 
https://issues.apache.org/jira/browse/BEAM-14499?focusedWorklogId=773582&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773582
 ]

ASF GitHub Bot logged work on BEAM-14499:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/May/22 16:31
            Start Date: 23/May/22 16:31
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17733:
URL: https://github.com/apache/beam/pull/17733#discussion_r879651487


##########
sdks/go/pkg/beam/testing/passert/equals.go:
##########
@@ -53,13 +54,37 @@ func EqualsList(s beam.Scope, col beam.PCollection, list 
interface{}) beam.PColl
        return equals(subScope, col, listCollection)
 }
 
+// WindowedEqualsList verifies that the given collection has the same values 
as a
+// given list, under coder equality. The values must be provided as an array 
or a slice.
+// This function also takes a window function to window the list into for 
cases where
+// the PCollections cannot be globally windowed (e.g. tests in unbounded 
pipelines.)
+// This windowing function is applied to both the PCollection created from the 
list
+// and the impulse used to trigger the Diff function.
+func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, 
list interface{}) beam.PCollection {

Review Comment:
   I think I'm on board with adding this function (it also just seems generally 
useful to be able to test any custom windows users might eventually have). I'm 
wondering, however, if it makes sense to just do some reasonable default 
windowing in the main EqualsList function (and then call windowedEquals instead 
of equals) - if you did that, could you get rid of all your changes to the 
teststream.go tests?
   
   The advantage of that approach is that users who don't care about explicitly 
testing windows could just call WindowEquals and wouldn't have to know about 
this function unless they want a custom windowFn



##########
sdks/go/pkg/beam/testing/passert/equals.go:
##########
@@ -53,13 +54,37 @@ func EqualsList(s beam.Scope, col beam.PCollection, list 
interface{}) beam.PColl
        return equals(subScope, col, listCollection)
 }
 
+// WindowedEqualsList verifies that the given collection has the same values 
as a
+// given list, under coder equality. The values must be provided as an array 
or a slice.
+// This function also takes a window function to window the list into for 
cases where
+// the PCollections cannot be globally windowed (e.g. tests in unbounded 
pipelines.)
+// This windowing function is applied to both the PCollection created from the 
list
+// and the impulse used to trigger the Diff function.
+func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, 
list interface{}) beam.PCollection {
+       subScope := s.Scope("passert.WindowedEqualsList")
+       if list == nil {
+               return Empty(subScope, col)
+       }
+       inter := beam.CreateList(subScope, list)
+       winList := beam.WindowInto(s, wfn, inter)
+       return windowedEquals(subScope, wfn, col, winList)
+}
+
 // equals verifies that the actual values match the expected ones.
 func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
        unexpected, correct, missing := Diff(s, actual, expected)
        beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input: 
unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
        return actual
 }
 
+// windowedEquals verifies that the actual values match the expected ones in 
cases where the PCollections
+// cannot be globally windowed.
+func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected 
beam.PCollection) beam.PCollection {
+       unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected)
+       beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn, 
beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input: 
correct}, beam.SideInput{Input: missing})

Review Comment:
   Right now I don't think failIfBadEntries prints out windowing information - 
it probably should since that can now make a difference.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 773582)
    Time Spent: 2h  (was: 1h 50m)

> TestStream tests are broken after new construction check was added
> ------------------------------------------------------------------
>
>                 Key: BEAM-14499
>                 URL: https://issues.apache.org/jira/browse/BEAM-14499
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P2
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The Go SDK's TestStream integration tests are currently failing as a result 
> of the restrictions on creating unwindowed, unbounded side inputs being added 
> for BEAM-14473. These should be fixed so the Flink PostCommit tests give 
> signal again. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to