Repository: beam Updated Branches: refs/heads/master e10fbdaa2 -> 395d14e33
Actually run after-count tests. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f194057 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f194057 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f194057 Branch: refs/heads/master Commit: 1f1940577bde19cf46c99deafb09687f331822bc Parents: e10fbda Author: Robert Bradshaw <[email protected]> Authored: Mon May 8 16:18:20 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Tue May 9 09:35:05 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/transforms/trigger_test.py | 32 ++++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1f194057/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 38871fe..c08a791 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -402,22 +402,22 @@ class RunnerApiTest(unittest.TestCase): class TriggerPipelineTest(unittest.TestCase): def test_after_count(self): - p = TestPipeline() - result = (p - | beam.Create([1, 2, 3, 4, 5, 10, 11]) - | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)]) - | beam.Map(lambda (k, t): TimestampedValue((k, t), t)) - | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3), - accumulation_mode=AccumulationMode.DISCARDING) - | beam.GroupByKey() - | beam.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v)))) - assert_that(result, equal_to( - { - 'A-5': {1, 2, 3, 4, 5}, - # A-10, A-11 never emitted due to AfterCount(3) never firing. - 'B-4': {6, 7, 8, 9}, - 'B-3': {10, 15, 16}, - }.iteritems())) + with TestPipeline() as p: + result = (p + | beam.Create([1, 2, 3, 4, 5, 10, 11]) + | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)]) + | beam.Map(lambda (k, t): TimestampedValue((k, t), t)) + | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3), + accumulation_mode=AccumulationMode.DISCARDING) + | beam.GroupByKey() + | beam.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v)))) + assert_that(result, equal_to( + { + 'A-5': {1, 2, 3, 4, 5}, + # A-10, A-11 never emitted due to AfterCount(3) never firing. + 'B-4': {6, 7, 8, 9}, + 'B-3': {10, 15, 16}, + }.iteritems())) class TranscriptTest(unittest.TestCase):
