[ 
https://issues.apache.org/jira/browse/BEAM-14499?focusedWorklogId=773601&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773601
 ]

ASF GitHub Bot logged work on BEAM-14499:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/May/22 17:05
            Start Date: 23/May/22 17:05
    Worklog Time Spent: 10m 
      Work Description: jrmccluskey commented on code in PR #17733:
URL: https://github.com/apache/beam/pull/17733#discussion_r879694833


##########
sdks/go/pkg/beam/testing/passert/equals.go:
##########
@@ -53,13 +54,37 @@ func EqualsList(s beam.Scope, col beam.PCollection, list 
interface{}) beam.PColl
        return equals(subScope, col, listCollection)
 }
 
+// WindowedEqualsList verifies that the given collection has the same values 
as a
+// given list, under coder equality. The values must be provided as an array 
or a slice.
+// This function also takes a window function to window the list into for 
cases where
+// the PCollections cannot be globally windowed (e.g. tests in unbounded 
pipelines.)
+// This windowing function is applied to both the PCollection created from the 
list
+// and the impulse used to trigger the Diff function.
+func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, 
list interface{}) beam.PCollection {
+       subScope := s.Scope("passert.WindowedEqualsList")
+       if list == nil {
+               return Empty(subScope, col)
+       }
+       inter := beam.CreateList(subScope, list)
+       winList := beam.WindowInto(s, wfn, inter)
+       return windowedEquals(subScope, wfn, col, winList)
+}
+
 // equals verifies that the actual values match the expected ones.
 func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
        unexpected, correct, missing := Diff(s, actual, expected)
        beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input: 
unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
        return actual
 }
 
+// windowedEquals verifies that the actual values match the expected ones in 
cases where the PCollections
+// cannot be globally windowed.
+func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected 
beam.PCollection) beam.PCollection {
+       unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected)
+       beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn, 
beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input: 
correct}, beam.SideInput{Input: missing})

Review Comment:
   I don't think we can be window function aware at that level.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 773601)
    Time Spent: 2h 40m  (was: 2.5h)

> TestStream tests are broken after new construction check was added
> ------------------------------------------------------------------
>
>                 Key: BEAM-14499
>                 URL: https://issues.apache.org/jira/browse/BEAM-14499
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P2
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The Go SDK's TestStream integration tests are currently failing as a result 
> of the restrictions on creating unwindowed, unbounded side inputs being added 
> for BEAM-14473. These should be fixed so the Flink PostCommit tests give 
> signal again. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to