Clean up test_stream_test and remove stray print statement
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7471e273 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7471e273 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7471e273 Branch: refs/heads/gearpump-runner Commit: 7471e2736cc22336500f6252ab8448889a2d04d3 Parents: 2a55200 Author: Charles Chen <[email protected]> Authored: Thu Jun 22 11:29:54 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Jun 22 13:17:47 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/direct/watermark_manager.py | 1 - sdks/python/apache_beam/testing/test_stream_test.py | 16 ++++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7471e273/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 2146bb5..4aa2bb4 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -175,7 +175,6 @@ class _TransformWatermarks(object): def update_timers(self, completed_timers): with self._lock: for timer_firing in completed_timers: - print 'REMOVE', timer_firing self._fired_timers.remove(timer_firing) @property http://git-wip-us.apache.org/repos/asf/beam/blob/7471e273/sdks/python/apache_beam/testing/test_stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index bf05ac1..071c7cd 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -25,6 +25,7 @@ from apache_beam.testing.test_stream import ElementEvent from apache_beam.testing.test_stream import ProcessingTimeEvent from apache_beam.testing.test_stream import TestStream from apache_beam.testing.test_stream import WatermarkEvent +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp from apache_beam.utils.windowed_value import WindowedValue @@ -92,28 +93,23 @@ class TestStreamTest(unittest.TestCase): .add_elements([TimestampedValue('late', 12)]) .add_elements([TimestampedValue('last', 310)])) - global _seen_elements # pylint: disable=global-variable-undefined - _seen_elements = [] - class RecordFn(beam.DoFn): def process(self, element=beam.DoFn.ElementParam, timestamp=beam.DoFn.TimestampParam): - _seen_elements.append((element, timestamp)) + yield (element, timestamp) p = TestPipeline() my_record_fn = RecordFn() - p | test_stream | beam.ParDo(my_record_fn) # pylint: disable=expression-not-assigned - p.run() - - self.assertEqual([ + records = p | test_stream | beam.ParDo(my_record_fn) + assert_that(records, equal_to([ ('a', timestamp.Timestamp(10)), ('b', timestamp.Timestamp(10)), ('c', timestamp.Timestamp(10)), ('d', timestamp.Timestamp(20)), ('e', timestamp.Timestamp(20)), ('late', timestamp.Timestamp(12)), - ('last', timestamp.Timestamp(310)),], _seen_elements) - del _seen_elements + ('last', timestamp.Timestamp(310)),])) + p.run() if __name__ == '__main__':
