Update tests to use TestPipeline()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ded9185 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ded9185 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ded9185 Branch: refs/heads/python-sdk Commit: 4ded9185c063eae9f54ce457a1fa753679d1ce82 Parents: 703c1bc Author: Ahmet Altay <[email protected]> Authored: Sun Jan 15 00:35:49 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 18 10:01:41 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline_test.py | 13 +-- .../apache_beam/tests/pipeline_verifiers.py | 4 +- .../apache_beam/transforms/aggregator_test.py | 3 +- .../apache_beam/transforms/combiners_test.py | 26 +++--- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 93 ++++++++++---------- .../apache_beam/transforms/window_test.py | 10 +-- .../transforms/write_ptransform_test.py | 5 +- .../typehints/typed_pipeline_test.py | 6 +- 9 files changed, 81 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index d6925d4..336bf54 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -25,6 +25,7 @@ from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap @@ -94,7 +95,7 @@ class PipelineTest(unittest.TestCase): self.leave_composite.append(transform_node) def test_create(self): - pipeline = Pipeline(self.runner_name) + pipeline = TestPipeline(runner=self.runner_name) pcoll = pipeline | 'label1' >> Create([1, 2, 3]) assert_that(pcoll, equal_to([1, 2, 3])) @@ -105,13 +106,13 @@ class PipelineTest(unittest.TestCase): pipeline.run() def test_create_singleton_pcollection(self): - pipeline = Pipeline(self.runner_name) + pipeline = TestPipeline(runner=self.runner_name) pcoll = pipeline | 'label' >> Create([[1, 2, 3]]) assert_that(pcoll, equal_to([[1, 2, 3]])) pipeline.run() def test_read(self): - pipeline = Pipeline(self.runner_name) + pipeline = TestPipeline(runner=self.runner_name) pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) assert_that(pcoll, equal_to([1, 2, 3])) pipeline.run() @@ -136,7 +137,7 @@ class PipelineTest(unittest.TestCase): self.assertEqual(visitor.leave_composite[0].transform, transform) def test_apply_custom_transform(self): - pipeline = Pipeline(self.runner_name) + pipeline = TestPipeline(runner=self.runner_name) pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) result = pcoll | PipelineTest.CustomTransform() assert_that(result, equal_to([2, 3, 4])) @@ -158,7 +159,7 @@ class PipelineTest(unittest.TestCase): 'pvalue | "label" >> transform') def test_reuse_cloned_custom_transform_instance(self): - pipeline = Pipeline(self.runner_name) + pipeline = TestPipeline(runner=self.runner_name) pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3]) pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() @@ -207,7 +208,7 @@ class PipelineTest(unittest.TestCase): num_elements = 10 num_maps = 100 - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline(runner='DirectRunner') # Consumed memory should not be proportional to the number of maps. memory_threshold = ( http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 6bf8d48..9b286a2 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -46,7 +46,7 @@ class PipelineStateMatcher(BaseMatcher): self.expected_state = expected_state def _matches(self, pipeline_result): - return pipeline_result.current_state() == self.expected_state + return pipeline_result.state == self.expected_state def describe_to(self, description): description \ @@ -56,7 +56,7 @@ class PipelineStateMatcher(BaseMatcher): def describe_mismatch(self, pipeline_result, mismatch_description): mismatch_description \ .append_text("Test pipeline job terminated in state: ") \ - .append_text(pipeline_result.current_state()) + .append_text(pipeline_result.state) def retry_on_io_error_and_server_error(exception): http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/aggregator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py index e77dfba..d493c46 100644 --- a/sdks/python/apache_beam/transforms/aggregator_test.py +++ b/sdks/python/apache_beam/transforms/aggregator_test.py @@ -22,6 +22,7 @@ import unittest import apache_beam as beam from apache_beam.transforms import combiners from apache_beam.transforms.aggregator import Aggregator +from apache_beam.test_pipeline import TestPipeline class AggregatorTest(unittest.TestCase): @@ -63,7 +64,7 @@ class AggregatorTest(unittest.TestCase): for a in aggregators: context.aggregate_to(a, context.element) - p = beam.Pipeline('DirectRunner') + p = TestPipeline() p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators()) # pylint: disable=expression-not-assigned res = p.run() for (_, _, expected), a in zip(counter_types, aggregators): http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 72fce60..8a6d352 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -22,7 +22,7 @@ import unittest import hamcrest as hc import apache_beam as beam -from apache_beam.pipeline import Pipeline +from apache_beam.test_pipeline import TestPipeline import apache_beam.transforms.combiners as combine from apache_beam.transforms.core import CombineGlobally from apache_beam.transforms.core import Create @@ -40,7 +40,7 @@ class CombineTest(unittest.TestCase): combine.TopCombineFn._MIN_BUFFER_OVERSIZE = 1 def test_builtin_combines(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] mean = sum(vals) / float(len(vals)) @@ -62,7 +62,7 @@ class CombineTest(unittest.TestCase): pipeline.run() def test_top(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() # A parameter we'll be sharing with a custom comparator. names = {0: 'zo', @@ -201,7 +201,7 @@ class CombineTest(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_top_shorthands(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5)) @@ -222,7 +222,7 @@ class CombineTest(unittest.TestCase): # First test global samples (lots of them). for ix in xrange(300): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> Create([1, 1, 2, 2]) result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) @@ -241,7 +241,7 @@ class CombineTest(unittest.TestCase): pipeline.run() # Now test per-key samples. - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start-perkey' >> Create( sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3) @@ -258,7 +258,7 @@ class CombineTest(unittest.TestCase): pipeline.run() def test_tuple_combine_fn(self): - p = Pipeline('DirectRunner') + p = TestPipeline() result = ( p | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) @@ -269,7 +269,7 @@ class CombineTest(unittest.TestCase): p.run() def test_tuple_combine_fn_without_defaults(self): - p = Pipeline('DirectRunner') + p = TestPipeline() result = ( p | Create([1, 1, 2, 3]) @@ -280,7 +280,7 @@ class CombineTest(unittest.TestCase): p.run() def test_to_list_and_to_dict(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] pcoll = pipeline | 'start' >> Create(the_list) result = pcoll | 'to list' >> combine.ToList() @@ -292,7 +292,7 @@ class CombineTest(unittest.TestCase): assert_that(result, matcher([the_list])) pipeline.run() - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pairs = [(1, 2), (3, 4), (5, 6)] pcoll = pipeline | 'start-pairs' >> Create(pairs) result = pcoll | 'to dict' >> combine.ToDict() @@ -306,12 +306,12 @@ class CombineTest(unittest.TestCase): pipeline.run() def test_combine_globally_with_default(self): - p = Pipeline('DirectRunner') + p = TestPipeline() assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) p.run() def test_combine_globally_without_default(self): - p = Pipeline('DirectRunner') + p = TestPipeline() result = p | Create([]) | CombineGlobally(sum).without_defaults() assert_that(result, equal_to([])) p.run() @@ -323,7 +323,7 @@ class CombineTest(unittest.TestCase): main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) - p = Pipeline('DirectRunner') + p = TestPipeline() result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() assert_that(result1, equal_to([0]), label='r1') http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 006a937..b5ac64b 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -443,7 +443,7 @@ class PTransform(WithTypeHints, HasDisplayData): # Get a reference to the runners internal cache, otherwise runner may # clean it after run. cache = p.runner.cache - p.run() + p.run().wait_until_finish() return _MaterializePValues(cache).visit(result) def _extract_input_pvalues(self, pvalueish): http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 705e85e..58382e4 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -26,7 +26,7 @@ import unittest import hamcrest as hc import apache_beam as beam -from apache_beam.pipeline import Pipeline +from apache_beam.test_pipeline import TestPipeline import apache_beam.pvalue as pvalue import apache_beam.transforms.combiners as combine from apache_beam.transforms.display import DisplayData, DisplayDataItem @@ -36,7 +36,6 @@ import apache_beam.typehints as typehints from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types from apache_beam.typehints.typehints_test import TypeHintTestCase -from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import TypeOptions @@ -54,12 +53,12 @@ class PTransformTest(unittest.TestCase): self.assertEqual('<PTransform(PTransform) label=[PTransform]>', str(PTransform())) - pa = Pipeline('DirectRunner') + pa = TestPipeline() res = pa | 'a_label' >> beam.Create([1, 2]) self.assertEqual('AppliedPTransform(a_label, Create)', str(res.producer)) - pc = Pipeline('DirectRunner') + pc = TestPipeline() res = pc | beam.Create([1, 2]) inputs_tr = res.producer.transform inputs_tr.inputs = ('ci',) @@ -67,7 +66,7 @@ class PTransformTest(unittest.TestCase): """<Create(PTransform) label=[Create] inputs=('ci',)>""", str(inputs_tr)) - pd = Pipeline('DirectRunner') + pd = TestPipeline() res = pd | beam.Create([1, 2]) side_tr = res.producer.transform side_tr.side_inputs = (4,) @@ -111,7 +110,7 @@ class PTransformTest(unittest.TestCase): def process(self, context, addon): return [context.element + addon] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10) assert_that(result, equal_to([11, 12, 13])) @@ -123,20 +122,20 @@ class PTransformTest(unittest.TestCase): def process(self, context): pass - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) with self.assertRaises(ValueError): pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s def test_do_with_callable(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_arg(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() side = pipeline | 'side' >> beam.Create([10]) pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | beam.FlatMap( @@ -145,7 +144,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_do_with_side_input_as_keyword_arg(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() side = pipeline | 'side' >> beam.Create([10]) pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | beam.FlatMap( @@ -154,7 +153,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_do_with_do_fn_returning_string_raises_warning(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) pcoll | 'do' >> beam.FlatMap(lambda x: x + '1') @@ -168,7 +167,7 @@ class PTransformTest(unittest.TestCase): self.assertStartswith(cm.exception.message, expected_error_prefix) def test_do_with_do_fn_returning_dict_raises_warning(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'}) @@ -182,7 +181,7 @@ class PTransformTest(unittest.TestCase): self.assertStartswith(cm.exception.message, expected_error_prefix) def test_do_with_side_outputs_maintains_unique_name(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') @@ -195,7 +194,7 @@ class PTransformTest(unittest.TestCase): # iterable. def incorrect_par_do_fn(x): return x + 5 - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([2, 9, 3]) pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn) # It's a requirement that all user-defined functions to a ParDo return @@ -216,7 +215,7 @@ class PTransformTest(unittest.TestCase): def finish_bundle(self, c): yield 'finish' - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | 'do' >> beam.ParDo(MyDoFn()) @@ -231,7 +230,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_filter(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4]) result = pcoll | beam.Filter( 'filter', lambda x: x % 2 == 0) @@ -257,7 +256,7 @@ class PTransformTest(unittest.TestCase): def test_combine_with_combine_fn(self): vals = [1, 2, 3, 4, 5, 6, 7] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn()) assert_that(result, equal_to([sum(vals) / len(vals)])) @@ -265,7 +264,7 @@ class PTransformTest(unittest.TestCase): def test_combine_with_callable(self): vals = [1, 2, 3, 4, 5, 6, 7] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) result = pcoll | beam.CombineGlobally(sum) assert_that(result, equal_to([sum(vals)])) @@ -273,7 +272,7 @@ class PTransformTest(unittest.TestCase): def test_combine_with_side_input_as_arg(self): values = [1, 2, 3, 4, 5, 6, 7] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(values) divisor = pipeline | 'divisor' >> beam.Create([2]) result = pcoll | beam.CombineGlobally( @@ -288,7 +287,7 @@ class PTransformTest(unittest.TestCase): def test_combine_per_key_with_combine_fn(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn()) @@ -299,7 +298,7 @@ class PTransformTest(unittest.TestCase): def test_combine_per_key_with_callable(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | beam.CombinePerKey(sum) @@ -309,7 +308,7 @@ class PTransformTest(unittest.TestCase): def test_combine_per_key_with_side_input_as_arg(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) divisor = pipeline | 'divisor' >> beam.Create([2]) @@ -322,7 +321,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_group_by_key(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) result = pcoll | 'group' >> beam.GroupByKey() @@ -336,7 +335,7 @@ class PTransformTest(unittest.TestCase): def partition_for(self, context, num_partitions, offset): return (context.element % 3) + offset - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) # Attempt nominal partition operation. partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1) @@ -348,14 +347,14 @@ class PTransformTest(unittest.TestCase): # Check that a bad partition label will yield an error. For the # DirectRunner, this error manifests as an exception. - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000) with self.assertRaises(ValueError): pipeline.run() def test_partition_with_callable(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) partitions = ( pcoll | beam.Partition( @@ -370,7 +369,7 @@ class PTransformTest(unittest.TestCase): def test_partition_followed_by_flatten_and_groupbykey(self): """Regression test for an issue with how partitions are handled.""" - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() contents = [('aa', 1), ('bb', 2), ('aa', 2)] created = pipeline | 'A' >> beam.Create(contents) partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3) @@ -380,7 +379,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_flatten_pcollections(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten() @@ -388,7 +387,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_flatten_no_pcollections(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() with self.assertRaises(ValueError): () | 'pipeline arg missing' >> beam.Flatten() result = () | 'empty' >> beam.Flatten(pipeline=pipeline) @@ -396,7 +395,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_flatten_pcollections_in_iterable(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) result = ([pcoll for pcoll in (pcoll_1, pcoll_2)] @@ -417,7 +416,7 @@ class PTransformTest(unittest.TestCase): set([1, 2, 3]) | 'flatten' >> beam.Flatten() def test_co_group_by_key_on_list(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll_1 = pipeline | beam.Create( 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) pcoll_2 = pipeline | beam.Create( @@ -429,7 +428,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_co_group_by_key_on_iterable(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll_1 = pipeline | beam.Create( 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) pcoll_2 = pipeline | beam.Create( @@ -442,7 +441,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_co_group_by_key_on_dict(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll_1 = pipeline | beam.Create( 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) pcoll_2 = pipeline | beam.Create( @@ -454,7 +453,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_group_by_key_input_must_be_kv_pairs(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5]) with self.assertRaises(typehints.TypeCheckError) as e: @@ -467,7 +466,7 @@ class PTransformTest(unittest.TestCase): 'Tuple[TypeVariable[K], TypeVariable[V]]') def test_group_by_key_only_input_must_be_kv_pairs(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f']) with self.assertRaises(typehints.TypeCheckError) as cm: pcolls | 'D' >> beam.GroupByKeyOnly() @@ -478,7 +477,7 @@ class PTransformTest(unittest.TestCase): self.assertStartswith(cm.exception.message, expected_error_prefix) def test_keys_and_values(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) keys = pcoll.apply('keys', beam.Keys()) @@ -488,7 +487,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_kv_swap(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)]) result = pcoll.apply('swap', beam.KvSwap()) @@ -496,7 +495,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_remove_duplicates(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel']) result = pcoll.apply('nodupes', beam.RemoveDuplicates()) @@ -504,7 +503,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_chained_ptransforms(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() t = (beam.Map(lambda x: (x, 1)) | beam.GroupByKey() | beam.Map(lambda (x, ones): (x, sum(ones)))) @@ -581,7 +580,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_chained_ptransforms(self): """Tests that chaining gets proper nesting.""" - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() map1 = beam.Map('map1', lambda x: (x, 1)) gbk = beam.GroupByKey('gbk') map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones))) @@ -594,7 +593,7 @@ class PTransformLabelsTest(unittest.TestCase): pipeline.run() def test_apply_custom_transform_without_label(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform() result = pipeline.apply(custom, pcoll) @@ -604,7 +603,7 @@ class PTransformLabelsTest(unittest.TestCase): pipeline.run() def test_apply_custom_transform_with_label(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform('*custom*') result = pipeline.apply(custom, pcoll) @@ -615,7 +614,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_combine_without_label(self): vals = [1, 2, 3, 4, 5, 6, 7] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) combine = beam.CombineGlobally(sum) result = pcoll | combine @@ -624,7 +623,7 @@ class PTransformLabelsTest(unittest.TestCase): pipeline.run() def test_apply_ptransform_using_decorator(self): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) sample = SamplePTransform('*sample*') _ = pcoll | sample @@ -635,7 +634,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_combine_with_label(self): vals = [1, 2, 3, 4, 5, 6, 7] - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) combine = beam.CombineGlobally('*sum*', sum) result = pcoll | combine @@ -644,7 +643,7 @@ class PTransformLabelsTest(unittest.TestCase): pipeline.run() def check_label(self, ptransform, expected_label): - pipeline = Pipeline('DirectRunner') + pipeline = TestPipeline() pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform actual_label = sorted(pipeline.applied_labels - {'start'})[0] self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label)) @@ -728,7 +727,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): '"%s" does not start with "%s"' % (msg, prefix)) def setUp(self): - self.p = Pipeline(options=PipelineOptions([])) + self.p = TestPipeline() def test_do_fn_pipeline_pipeline_type_check_satisfied(self): @with_input_types(int, int) http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 856d011..d4e8e25 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -19,7 +19,7 @@ import unittest -from apache_beam.pipeline import Pipeline +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import CombinePerKey from apache_beam.transforms import combiners from apache_beam.transforms import core @@ -140,7 +140,7 @@ class WindowTest(unittest.TestCase): | Map(lambda x: WindowedValue((key, x), x, []))) def test_sliding_windows(self): - p = Pipeline('DirectRunner') + p = TestPipeline() pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) result = (pcoll | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) @@ -153,7 +153,7 @@ class WindowTest(unittest.TestCase): p.run() def test_sessions(self): - p = Pipeline('DirectRunner') + p = TestPipeline() pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) result = (pcoll | 'w' >> WindowInto(Sessions(10)) @@ -166,7 +166,7 @@ class WindowTest(unittest.TestCase): p.run() def test_timestamped_value(self): - p = Pipeline('DirectRunner') + p = TestPipeline() result = (p | 'start' >> Create([(k, k) for k in range(10)]) | Map(lambda (x, t): TimestampedValue(x, t)) @@ -178,7 +178,7 @@ class WindowTest(unittest.TestCase): p.run() def test_timestamped_with_combiners(self): - p = Pipeline('DirectRunner') + p = TestPipeline() result = (p # Create some initial test values. | 'start' >> Create([(k, k) for k in range(10)]) http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index f96dffb..3d7fbd9 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -22,10 +22,9 @@ import unittest import apache_beam as beam from apache_beam.io import iobase -from apache_beam.pipeline import Pipeline +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import assert_that, is_empty -from apache_beam.utils.pipeline_options import PipelineOptions class _TestSink(iobase.Sink): @@ -99,7 +98,7 @@ class WriteTest(unittest.TestCase): return_write_results=True): write_to_test_sink = WriteToTestSink(return_init_result, return_write_results) - p = Pipeline(options=PipelineOptions([])) + p = TestPipeline() result = p | beam.Create(data) | write_to_test_sink | beam.Map(list) assert_that(result, is_empty()) http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 8b5e3f4..35987b7 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -23,10 +23,10 @@ import unittest import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that, equal_to from apache_beam.typehints import WithTypeHints from apache_beam.utils.pipeline_options import OptionsContext -from apache_beam.utils.pipeline_options import PipelineOptions # These test often construct a pipeline as value | PTransform to test side # effects (e.g. errors). @@ -168,7 +168,7 @@ class SideInputTest(unittest.TestCase): @typehints.with_input_types(str, int) def repeat(s, times): return s * times - p = beam.Pipeline(options=PipelineOptions([])) + p = TestPipeline() main_input = p | beam.Create(['a', 'bb', 'c']) side_input = p | 'side' >> beam.Create([3]) result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) @@ -183,7 +183,7 @@ class SideInputTest(unittest.TestCase): @typehints.with_input_types(str, typehints.Iterable[str]) def concat(glue, items): return glue.join(sorted(items)) - p = beam.Pipeline(options=PipelineOptions([])) + p = TestPipeline() main_input = p | beam.Create(['a', 'bb', 'c']) side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
