[
https://issues.apache.org/jira/browse/BEAM-14499?focusedWorklogId=773615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773615
]
ASF GitHub Bot logged work on BEAM-14499:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/May/22 17:34
Start Date: 23/May/22 17:34
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17733:
URL: https://github.com/apache/beam/pull/17733#discussion_r879716039
##########
sdks/go/pkg/beam/testing/passert/equals.go:
##########
@@ -53,13 +54,37 @@ func EqualsList(s beam.Scope, col beam.PCollection, list
interface{}) beam.PColl
return equals(subScope, col, listCollection)
}
+// WindowedEqualsList verifies that the given collection has the same values
as a
+// given list, under coder equality. The values must be provided as an array
or a slice.
+// This function also takes a window function to window the list into for
cases where
+// the PCollections cannot be globally windowed (e.g. tests in unbounded
pipelines.)
+// This windowing function is applied to both the PCollection created from the
list
+// and the impulse used to trigger the Diff function.
+func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection,
list interface{}) beam.PCollection {
+ subScope := s.Scope("passert.WindowedEqualsList")
+ if list == nil {
+ return Empty(subScope, col)
+ }
+ inter := beam.CreateList(subScope, list)
+ winList := beam.WindowInto(s, wfn, inter)
+ return windowedEquals(subScope, wfn, col, winList)
+}
+
// equals verifies that the actual values match the expected ones.
func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
unexpected, correct, missing := Diff(s, actual, expected)
beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input:
unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
return actual
}
+// windowedEquals verifies that the actual values match the expected ones in
cases where the PCollections
+// cannot be globally windowed.
+func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected
beam.PCollection) beam.PCollection {
+ unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected)
+ beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn,
beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input:
correct}, beam.SideInput{Input: missing})
Review Comment:
Yeah, I meant information about the actual window, not the windowing
function. Basically, if you get different results because of windowing it would
be nice if that was obvious from the error
Issue Time Tracking
-------------------
Worklog Id: (was: 773615)
Time Spent: 2h 50m (was: 2h 40m)
> TestStream tests are broken after new construction check was added
> ------------------------------------------------------------------
>
> Key: BEAM-14499
> URL: https://issues.apache.org/jira/browse/BEAM-14499
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Jack McCluskey
> Assignee: Jack McCluskey
> Priority: P2
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> The Go SDK's TestStream integration tests are currently failing as a result
> of the restrictions on creating unwindowed, unbounded side inputs being added
> for BEAM-14473. These should be fixed so the Flink PostCommit tests give
> signal again.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)