Repository: incubator-beam Updated Branches: refs/heads/python-sdk 57f03f793 -> 03662da9d
Addressing comments Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1d478dd4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1d478dd4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1d478dd4 Branch: refs/heads/python-sdk Commit: 1d478dd43dc1c54c1ef0e70e4aa6c2a548c387fa Parents: bfcff01 Author: Pablo <[email protected]> Authored: Thu Oct 20 13:15:00 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Oct 27 10:39:32 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow_runner.py | 6 ++++- sdks/python/apache_beam/runners/runner_test.py | 6 +++-- sdks/python/apache_beam/transforms/display.py | 24 +++++++++++--------- .../apache_beam/transforms/display_test.py | 13 +++++------ 4 files changed, 28 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index c543d2f..a387332 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -264,9 +264,13 @@ class DataflowPipelineRunner(PipelineRunner): for tag in side_tags: self._cache.cache_output(transform_node, tag, step) + # Finally, we add the display data items to the pipeline step. + # If the transform contains no display data then an empty list is added. step.add_property( PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform_node.transform).output()) + [item.get_dict() for item in + DisplayData.create_from(transform_node.transform).items]) + return step def run_Create(self, transform_node): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 891f9dc..0ba42d3 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -74,9 +74,11 @@ class RunnerTest(unittest.TestCase): p = Pipeline(remote_runner, options=PipelineOptions(self.default_properties)) + # TODO: Should not subclass ParDo. Switch to PTransform as soon as + # composite transforms support display data. class SpecialParDo(beam.ParDo): - def __init__(self, fn, now, *args, **kwargs): - super(SpecialParDo, self).__init__(fn, *args, **kwargs) + def __init__(self, fn, now): + super(SpecialParDo, self).__init__(fn) self.fn = fn self.now = now http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 372cbf5..71fd34e 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -80,9 +80,9 @@ class DisplayData(object): def __init__(self, namespace, display_data_dict): self.namespace = namespace self.items = [] - self.populate_items(display_data_dict) + self._populate_items(display_data_dict) - def populate_items(self, display_data_dict): + def _populate_items(self, display_data_dict): """ Populates the list of display data items. """ for key, element in display_data_dict.items(): @@ -105,11 +105,6 @@ class DisplayData(object): namespace=self.namespace, key=key)) - def output(self): - """ Returns the JSON-API list of display data items to send to the runner. - """ - return [item.get_dict() for item in self.items] - @classmethod def create_from(cls, has_display_data): """ Creates DisplayData from a HasDisplayData instance. @@ -159,13 +154,15 @@ class DisplayDataItem(object): ValueError: If the item does not have a key, namespace, value or type. """ if self.key is None: - raise ValueError('Key must not be None') + raise ValueError('Invalid DisplayDataItem. Key must not be None') if self.namespace is None: - raise ValueError('Namespace must not be None') + raise ValueError('Invalid DisplayDataItem. Namespace must not be None') if self.value is None: - raise ValueError('Value must not be None') + raise ValueError('Invalid DisplayDataItem. Value must not be None') if self.type is None: - raise ValueError('Value {} is of an unsupported type.'.format(self.value)) + raise ValueError( + 'Invalid DisplayDataItem. Value {} is of an unsupported type.' + .format(self.value)) def get_dict(self): """ Returns the internal-API dictionary representing the DisplayDataItem. @@ -197,6 +194,11 @@ class DisplayDataItem(object): def __repr__(self): return 'DisplayDataItem({})'.format(json.dumps(self.get_dict())) + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.get_dict() == other.get_dict() + return False + @classmethod def _format_value(cls, value, type_): """ Returns the API representation of a value given its type. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 906bb8f..013172f 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -50,8 +50,8 @@ class DisplayDataTest(unittest.TestCase): It does not test subcomponent inclusion """ class MyDoFn(beam.DoFn): - def __init__(self, *args, **kwargs): - self.my_display_data = kwargs.get('display_data', None) + def __init__(self, my_display_data=None): + self.my_display_data = my_display_data def process(self, context): yield context.element + 1 @@ -66,13 +66,13 @@ class DisplayDataTest(unittest.TestCase): 'my_dd': self.my_display_data} now = datetime.now() - fn = MyDoFn(display_data=now) + fn = MyDoFn(my_display_data=now) dd = DisplayData.create_from(fn) dd_dicts = sorted([item.get_dict() for item in dd.items], key=lambda x: x['namespace']+x['key']) nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__) - expected_items = [ + expected_items = sorted([ {'url': 'http://github.com', 'namespace': nspace, 'value': 'github.com', 'label': 'The URL', 'key': 'complex_url', 'type': 'STRING'}, @@ -84,14 +84,13 @@ class DisplayDataTest(unittest.TestCase): {'type': 'INTEGER', 'namespace': nspace, 'value': 120, 'key': 'static_integer'}, {'type': 'STRING', 'namespace': nspace, - 'value': 'static me!', 'key': 'static_string'}] - expected_items = sorted(expected_items, + 'value': 'static me!', 'key': 'static_string'}], key=lambda x: x['namespace']+x['key']) self.assertEqual(dd_dicts, expected_items) def test_subcomponent(self): - class SpecialParDo(beam.ParDo): + class SpecialParDo(beam.PTransform): def __init__(self, fn): self.fn = fn
