Cleanup tests in pipeline_test. Notably, the runner_name parameter has been obsolete since the removal of DiskCachedRunnerPipelineTest and is an inferior version of what TestPipeline provides.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61d8d3f0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61d8d3f0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61d8d3f0 Branch: refs/heads/master Commit: 61d8d3f0690142f6dc87b1484d3ebd148a706837 Parents: 9540cf1 Author: Robert Bradshaw <[email protected]> Authored: Sat Jan 21 21:07:39 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 25 12:38:03 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline_test.py | 57 ++++++++++----------------- 1 file changed, 21 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/61d8d3f0/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 93b68d1..833293f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -38,8 +38,8 @@ from apache_beam.transforms import Read from apache_beam.transforms import WindowInto from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import SlidingWindows +from apache_beam.transforms.window import TimestampedValue from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -70,9 +70,6 @@ class FakeSource(NativeSource): class PipelineTest(unittest.TestCase): - def setUp(self): - self.runner_name = 'DirectRunner' - @staticmethod def custom_callable(pcoll): return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) @@ -103,7 +100,7 @@ class PipelineTest(unittest.TestCase): self.leave_composite.append(transform_node) def test_create(self): - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'label1' >> Create([1, 2, 3]) assert_that(pcoll, equal_to([1, 2, 3])) @@ -114,19 +111,19 @@ class PipelineTest(unittest.TestCase): pipeline.run() def test_create_singleton_pcollection(self): - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'label' >> Create([[1, 2, 3]]) assert_that(pcoll, equal_to([[1, 2, 3]])) pipeline.run() def test_read(self): - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) assert_that(pcoll, equal_to([1, 2, 3])) pipeline.run() def test_visit_entire_graph(self): - pipeline = Pipeline(self.runner_name) + pipeline = Pipeline() pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3]) pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1]) pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1]) @@ -145,14 +142,14 @@ class PipelineTest(unittest.TestCase): self.assertEqual(visitor.leave_composite[0].transform, transform) def test_apply_custom_transform(self): - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) result = pcoll | PipelineTest.CustomTransform() assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_reuse_custom_transform_instance(self): - pipeline = Pipeline(self.runner_name) + pipeline = Pipeline() pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3]) pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() @@ -167,7 +164,7 @@ class PipelineTest(unittest.TestCase): 'pvalue | "label" >> transform') def test_reuse_cloned_custom_transform_instance(self): - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3]) pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() @@ -240,7 +237,7 @@ class PipelineTest(unittest.TestCase): def raise_exception(exn): raise exn with self.assertRaises(ValueError): - with Pipeline(self.runner_name) as p: + with Pipeline() as p: # pylint: disable=expression-not-assigned p | Create([ValueError]) | Map(raise_exception) @@ -251,15 +248,12 @@ class PipelineTest(unittest.TestCase): class NewDoFnTest(unittest.TestCase): - def setUp(self): - self.runner_name = 'DirectRunner' - def test_element(self): class TestDoFn(NewDoFn): def process(self, element): yield element + 10 - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn()) assert_that(pcoll, equal_to([11, 12])) pipeline.run() @@ -269,7 +263,7 @@ class NewDoFnTest(unittest.TestCase): def process(self, element, context=NewDoFn.ContextParam): yield context.element + 10 - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn()) assert_that(pcoll, equal_to([11, 12])) pipeline.run() @@ -307,25 +301,16 @@ class NewDoFnTest(unittest.TestCase): def test_window_param(self): class TestDoFn(NewDoFn): def process(self, element, window=NewDoFn.WindowParam): - yield (float(window.start), float(window.end)) - - class TestWindowFn(WindowFn): - """Windowing function adding two disjoint windows to each element.""" - - def assign(self, assign_context): - _ = assign_context - return [IntervalWindow(10, 20), IntervalWindow(20, 30)] + yield (element, (float(window.start), float(window.end))) - def merge(self, existing_windows): - return existing_windows - - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = (pipeline - | 'KVs' >> Create([(1, 10), (2, 20)]) - | 'W' >> WindowInto(windowfn=TestWindowFn()) - | 'Do' >> ParDo(TestDoFn())) - assert_that(pcoll, equal_to([(10.0, 20.0), (10.0, 20.0), - (20.0, 30.0), (20.0, 30.0)])) + | Create([1, 7]) + | Map(lambda x: TimestampedValue(x, x)) + | WindowInto(windowfn=SlidingWindows(10, 5)) + | ParDo(TestDoFn())) + assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)), + (7, (0, 10)), (7, (5, 15))])) pipeline.run() def test_timestamp_param(self): @@ -333,7 +318,7 @@ class NewDoFnTest(unittest.TestCase): def process(self, element, timestamp=NewDoFn.TimestampParam): yield timestamp - pipeline = TestPipeline(runner=self.runner_name) + pipeline = TestPipeline() pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn()) assert_that(pcoll, equal_to([MIN_TIMESTAMP, MIN_TIMESTAMP])) pipeline.run()
