[
https://issues.apache.org/jira/browse/BEAM-14499?focusedWorklogId=773633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773633
]
ASF GitHub Bot logged work on BEAM-14499:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/May/22 18:11
Start Date: 23/May/22 18:11
Worklog Time Spent: 10m
Work Description: jrmccluskey commented on code in PR #17733:
URL: https://github.com/apache/beam/pull/17733#discussion_r879746160
##########
sdks/go/pkg/beam/testing/passert/equals.go:
##########
@@ -53,13 +54,37 @@ func EqualsList(s beam.Scope, col beam.PCollection, list
interface{}) beam.PColl
return equals(subScope, col, listCollection)
}
+// WindowedEqualsList verifies that the given collection has the same values
as a
+// given list, under coder equality. The values must be provided as an array
or a slice.
+// This function also takes a window function to window the list into for
cases where
+// the PCollections cannot be globally windowed (e.g. tests in unbounded
pipelines.)
+// This windowing function is applied to both the PCollection created from the
list
+// and the impulse used to trigger the Diff function.
+func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection,
list interface{}) beam.PCollection {
+ subScope := s.Scope("passert.WindowedEqualsList")
+ if list == nil {
+ return Empty(subScope, col)
+ }
+ inter := beam.CreateList(subScope, list)
+ winList := beam.WindowInto(s, wfn, inter)
+ return windowedEquals(subScope, wfn, col, winList)
+}
+
// equals verifies that the actual values match the expected ones.
func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
unexpected, correct, missing := Diff(s, actual, expected)
beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input:
unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
return actual
}
+// windowedEquals verifies that the actual values match the expected ones in
cases where the PCollections
+// cannot be globally windowed.
+func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected
beam.PCollection) beam.PCollection {
+ unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected)
+ beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn,
beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input:
correct}, beam.SideInput{Input: missing})
Review Comment:
Okay that was more doable. We have a windowing call-out now.
Issue Time Tracking
-------------------
Worklog Id: (was: 773633)
Time Spent: 3h (was: 2h 50m)
> TestStream tests are broken after new construction check was added
> ------------------------------------------------------------------
>
> Key: BEAM-14499
> URL: https://issues.apache.org/jira/browse/BEAM-14499
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Jack McCluskey
> Assignee: Jack McCluskey
> Priority: P2
> Time Spent: 3h
> Remaining Estimate: 0h
>
> The Go SDK's TestStream integration tests are currently failing as a result
> of the restrictions on creating unwindowed, unbounded side inputs being added
> for BEAM-14473. These should be fixed so the Flink PostCommit tests give
> signal again.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)