Repository: beam Updated Branches: refs/heads/master 3a8ae530c -> 3b890e838
[BEAM-1218] Move the dataflow specific parts from runner_test to dataflow_runner_test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8435463 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8435463 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8435463 Branch: refs/heads/master Commit: d8435463d7e5b416b383bd4caf7752d023339494 Parents: 3a8ae53 Author: Sourabh Bajaj <[email protected]> Authored: Thu Feb 23 14:46:02 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Feb 23 14:51:01 2017 -0800 ---------------------------------------------------------------------- .../runners/dataflow/dataflow_runner_test.py | 114 ++++++++++++++++++- sdks/python/apache_beam/runners/runner_test.py | 107 ----------------- 2 files changed, 111 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8435463/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 4a538b1..4a0815a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -17,31 +17,49 @@ """Unit tests for the DataflowRunner class.""" +import json import unittest +from datetime import datetime import mock +import apache_beam as beam +import apache_beam.transforms as ptransform + +from apache_beam.pipeline import Pipeline +from apache_beam.runners import create_runner +from apache_beam.runners import DataflowRunner +from apache_beam.runners import TestDataflowRunner from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api +from apache_beam.transforms.display import DisplayDataItem +from apache_beam.utils.pipeline_options import PipelineOptions # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position try: - from apitools.base.py import base_api + from apache_beam.runners.dataflow.internal import apiclient except ImportError: - base_api = None + apiclient = None # pylint: enable=wrong-import-order, wrong-import-position [email protected](apiclient is None, 'GCP dependencies are not installed') class DataflowRunnerTest(unittest.TestCase): + default_properties = [ + '--dataflow_endpoint=ignored', + '--job_name=test-job', + '--project=test-project', + '--staging_location=ignored', + '--temp_location=/dev/null', + '--no_auth=True'] def test_dataflow_runner_has_metrics(self): df_result = DataflowPipelineResult('somejob', 'somerunner') self.assertTrue(df_result.metrics()) self.assertTrue(df_result.metrics().query()) - @unittest.skipIf(base_api is None, 'GCP dependencies are not installed') @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): values_enum = dataflow_api.Job.CurrentStateValueValuesEnum @@ -73,6 +91,96 @@ class DataflowRunnerTest(unittest.TestCase): succeeded_runner.job, succeeded_runner) succeeded_result.wait_until_finish() + def test_create_runner(self): + self.assertTrue( + isinstance(create_runner('DataflowRunner'), + DataflowRunner)) + self.assertTrue( + isinstance(create_runner('TestDataflowRunner'), + TestDataflowRunner)) + + def test_remote_runner_translation(self): + remote_runner = DataflowRunner() + p = Pipeline(remote_runner, + options=PipelineOptions(self.default_properties)) + + (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned + | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) + | ptransform.GroupByKey()) + remote_runner.job = apiclient.Job(p.options) + super(DataflowRunner, remote_runner).run(p) + + def test_remote_runner_display_data(self): + remote_runner = DataflowRunner() + p = Pipeline(remote_runner, + options=PipelineOptions(self.default_properties)) + + # TODO: Should not subclass ParDo. Switch to PTransform as soon as + # composite transforms support display data. + class SpecialParDo(beam.ParDo): + def __init__(self, fn, now): + super(SpecialParDo, self).__init__(fn) + self.fn = fn + self.now = now + + # Make this a list to be accessible within closure + def display_data(self): + return {'asubcomponent': self.fn, + 'a_class': SpecialParDo, + 'a_time': self.now} + + class SpecialDoFn(beam.DoFn): + def display_data(self): + return {'dofn_value': 42} + + def process(self): + pass + + now = datetime.now() + # pylint: disable=expression-not-assigned + (p | ptransform.Create([1, 2, 3, 4, 5]) + | 'Do' >> SpecialParDo(SpecialDoFn(), now)) + + remote_runner.job = apiclient.Job(p.options) + super(DataflowRunner, remote_runner).run(p) + job_dict = json.loads(str(remote_runner.job)) + steps = [step + for step in job_dict['steps'] + if len(step['properties'].get('display_data', [])) > 0] + step = steps[0] + disp_data = step['properties']['display_data'] + disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key']) + nspace = SpecialParDo.__module__+ '.' + expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo', + 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'), + 'key': 'a_time'}, + {'type': 'STRING', 'namespace': nspace+'SpecialParDo', + 'value': nspace+'SpecialParDo', 'key': 'a_class', + 'shortValue': 'SpecialParDo'}, + {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn', + 'value': 42, 'key': 'dofn_value'}] + expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key']) + self.assertEqual(len(disp_data), 3) + self.assertEqual(disp_data, expected_data) + + def test_no_group_by_key_directly_after_bigquery(self): + remote_runner = DataflowRunner() + p = Pipeline(remote_runner, + options=PipelineOptions([ + '--dataflow_endpoint=ignored', + '--job_name=test-job', + '--project=test-project', + '--staging_location=ignored', + '--temp_location=/dev/null', + '--no_auth=True' + ])) + rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable')) + with self.assertRaises(ValueError, + msg=('Coder for the GroupByKey operation' + '"GroupByKey" is not a key-value coder: ' + 'RowAsDictJsonCoder')): + unused_invalid = rows | beam.GroupByKey() + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/d8435463/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index d39c6eb..b161cbb 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -22,9 +22,7 @@ the other unit tests. In this file we choose to test only aspects related to caching and clearing values that are not tested elsewhere. """ -import json import unittest -from datetime import datetime import hamcrest as hc @@ -38,22 +36,10 @@ from apache_beam.metrics.metricbase import MetricName from apache_beam.pipeline import Pipeline from apache_beam.runners import DirectRunner from apache_beam.runners import create_runner -from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.runners import DataflowRunner -from apache_beam.runners import TestDataflowRunner - -# Protect against environments where api client is not available. -# pylint: disable=wrong-import-order, wrong-import-position -try: - from apache_beam.runners.dataflow.internal import apiclient -except ImportError: - apiclient = None -# pylint: enable=wrong-import-order, wrong-import-position - class RunnerTest(unittest.TestCase): default_properties = [ @@ -67,14 +53,6 @@ class RunnerTest(unittest.TestCase): def test_create_runner(self): self.assertTrue( isinstance(create_runner('DirectRunner'), DirectRunner)) - if apiclient is not None: - self.assertTrue( - isinstance(create_runner('DataflowRunner'), - DataflowRunner)) - if apiclient is not None: - self.assertTrue( - isinstance(create_runner('TestDataflowRunner'), - TestDataflowRunner)) self.assertRaises(ValueError, create_runner, 'xyz') def test_create_runner_shorthand(self): @@ -89,72 +67,6 @@ class RunnerTest(unittest.TestCase): self.assertTrue( isinstance(create_runner('Direct'), DirectRunner)) - @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') - def test_remote_runner_translation(self): - remote_runner = DataflowRunner() - p = Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) - - (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned - | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) - | ptransform.GroupByKey()) - remote_runner.job = apiclient.Job(p.options) - super(DataflowRunner, remote_runner).run(p) - - @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') - def test_remote_runner_display_data(self): - remote_runner = DataflowRunner() - p = Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) - - # TODO: Should not subclass ParDo. Switch to PTransform as soon as - # composite transforms support display data. - class SpecialParDo(beam.ParDo): - def __init__(self, fn, now): - super(SpecialParDo, self).__init__(fn) - self.fn = fn - self.now = now - - # Make this a list to be accessible within closure - def display_data(self): - return {'asubcomponent': self.fn, - 'a_class': SpecialParDo, - 'a_time': self.now} - - class SpecialDoFn(beam.DoFn): - def display_data(self): - return {'dofn_value': 42} - - def process(self): - pass - - now = datetime.now() - # pylint: disable=expression-not-assigned - (p | ptransform.Create([1, 2, 3, 4, 5]) - | 'Do' >> SpecialParDo(SpecialDoFn(), now)) - - remote_runner.job = apiclient.Job(p.options) - super(DataflowRunner, remote_runner).run(p) - job_dict = json.loads(str(remote_runner.job)) - steps = [step - for step in job_dict['steps'] - if len(step['properties'].get('display_data', [])) > 0] - step = steps[0] - disp_data = step['properties']['display_data'] - disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key']) - nspace = SpecialParDo.__module__+ '.' - expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo', - 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'), - 'key': 'a_time'}, - {'type': 'STRING', 'namespace': nspace+'SpecialParDo', - 'value': nspace+'SpecialParDo', 'key': 'a_class', - 'shortValue': 'SpecialParDo'}, - {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn', - 'value': 42, 'key': 'dofn_value'}] - expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key']) - self.assertEqual(len(disp_data), 3) - self.assertEqual(disp_data, expected_data) - def test_direct_runner_metrics(self): from apache_beam.metrics.metric import Metrics @@ -206,25 +118,6 @@ class RunnerTest(unittest.TestCase): DistributionResult(DistributionData(15, 5, 1, 5)), DistributionResult(DistributionData(15, 5, 1, 5))))) - @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') - def test_no_group_by_key_directly_after_bigquery(self): - remote_runner = DataflowRunner() - p = Pipeline(remote_runner, - options=PipelineOptions([ - '--dataflow_endpoint=ignored', - '--job_name=test-job', - '--project=test-project', - '--staging_location=ignored', - '--temp_location=/dev/null', - '--no_auth=True' - ])) - rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable')) - with self.assertRaises(ValueError, - msg=('Coder for the GroupByKey operation' - '"GroupByKey" is not a key-value coder: ' - 'RowAsDictJsonCoder')): - unused_invalid = rows | beam.GroupByKey() - if __name__ == '__main__': unittest.main()
