Moving files. Using DisplayDataItem to enable dictionaries to be used as display data
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d864d968 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d864d968 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d864d968 Branch: refs/heads/python-sdk Commit: d864d968eefb0fdc0088ec65fee0cc54955ae7b2 Parents: 1102201 Author: Pablo <pabl...@google.com> Authored: Mon Oct 17 16:56:40 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Oct 27 10:39:32 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow_runner.py | 28 +--- sdks/python/apache_beam/runners/runner_test.py | 3 +- sdks/python/apache_beam/transforms/__init__.py | 1 - sdks/python/apache_beam/transforms/display.py | 152 +++++++++++++++++++ .../apache_beam/transforms/display/__init__.py | 1 - .../transforms/display/display_data.py | 151 ------------------ .../transforms/display/display_data_test.py | 119 --------------- .../apache_beam/transforms/display_test.py | 119 +++++++++++++++ 8 files changed, 277 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 226b460..c543d2f 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -263,6 +263,10 @@ class DataflowPipelineRunner(PipelineRunner): # cache always contain the tag. for tag in side_tags: self._cache.cache_output(transform_node, tag, step) + + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform_node.transform).output()) return step def run_Create(self, transform_node): @@ -287,9 +291,6 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform).output()) def run_CreatePCollectionView(self, transform_node): step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON, @@ -308,9 +309,6 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform_node.transform).output()) def run_Flatten(self, transform_node): step = self._add_step(TransformNames.FLATTEN, @@ -330,9 +328,6 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform_node.transform).output()) def apply_GroupByKey(self, transform, pcoll): # Infer coder of parent. @@ -374,9 +369,6 @@ class DataflowPipelineRunner(PipelineRunner): windowing = transform_node.transform.get_windowing( transform_node.inputs) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing)) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform_node.transform).output()) def run_ParDo(self, transform_node): transform = transform_node.transform @@ -437,9 +429,6 @@ class DataflowPipelineRunner(PipelineRunner): '%s_%s' % (PropertyNames.OUT, side_tag))}) step.add_property(PropertyNames.OUTPUT_INFO, outputs) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform).output()) @staticmethod def _pardo_fn_data(transform_node, get_label): @@ -488,9 +477,6 @@ class DataflowPipelineRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}) step.add_property(PropertyNames.OUTPUT_INFO, outputs) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform).output()) def run_Read(self, transform_node): transform = transform_node.transform @@ -565,9 +551,6 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform).output()) def run__NativeWrite(self, transform_node): transform = transform_node.transform @@ -637,9 +620,6 @@ class DataflowPipelineRunner(PipelineRunner): {'@type': 'OutputReference', PropertyNames.STEP_NAME: input_step.proto.name, PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)}) - step.add_property( - PropertyNames.DISPLAY_DATA, - DisplayData.create_from(transform).output()) class DataflowPipelineResult(PipelineResult): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 8663a15..19160c3 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -112,7 +112,8 @@ class RunnerTest(unittest.TestCase): 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'), 'key': 'a_time'}, {'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo', - 'value': nspace+'SpecialParDo', 'key': 'a_class'}, + 'value': nspace+'SpecialParDo', 'key': 'a_class', + 'shortValue': 'SpecialParDo'}, {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn', 'value': 42, 'key': 'dofn_value'}] expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key']) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 3cfe60b..db8e193 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -24,4 +24,3 @@ from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.timeutil import * from apache_beam.transforms.util import * -from apache_beam.transforms.display import * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py new file mode 100644 index 0000000..87d3046 --- /dev/null +++ b/sdks/python/apache_beam/transforms/display.py @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +DisplayData, its classes, interfaces and methods. +""" + +from __future__ import absolute_import + +import calendar +from datetime import datetime, timedelta +import inspect +import json + +__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] + + +class HasDisplayData(object): + """ Basic interface for elements that contain display data. + + It contains only the display_data method and a namespace method. + """ + def display_data(self): + return {} + + def _namespace(self): + return '{}.{}'.format(self.__module__, self.__class__.__name__) + + +class DisplayData(object): + def __init__(self, namespace, display_data_dict): + self.namespace = namespace + self.items = [] + self.populate_items(display_data_dict) + + def populate_items(self, display_data_dict): + for key, element in display_data_dict.items(): + if isinstance(element, HasDisplayData): + subcomponent_display_data = DisplayData(element._namespace(), + element.display_data()) + self.items += subcomponent_display_data.items + continue + + if isinstance(element, DisplayDataItem): + element.key = key + element.namespace = self.namespace + self.items.append(element) + continue + + # If it's not a HasDisplayData element, + # nor a dictionary, then it's a simple value + self.items.append( + DisplayDataItem(element, + namespace=self.namespace, + key=key)) + + def output(self): + return [item.get_dict() for item in self.items] + + @classmethod + def create_from(cls, has_display_data): + if not isinstance(has_display_data, HasDisplayData): + raise ValueError('Element of class {}.{} does not subclass HasDisplayData' + .format(has_display_data.__module__, + has_display_data.__class__.__name__)) + return cls(has_display_data._namespace(), has_display_data.display_data()) + + +class DisplayDataItem(object): + typeDict = {str:'STRING', + int:'INTEGER', + float:'FLOAT', + timedelta:'DURATION', + datetime:'TIMESTAMP'} + + def __init__(self, value, url=None, label=None, + namespace=None, key=None, shortValue=None): + self.namespace = namespace + self.key = key + self.type = self._get_value_type(value) + self.shortValue = (shortValue if shortValue is not None else + self._get_short_value(value, self.type)) + self.value = value + self.url = url + self.label = label + + def is_valid(self): + if self.key is None: + raise ValueError('Key must not be None') + if self.namespace is None: + raise ValueError('Namespace must not be None') + if self.value is None: + raise ValueError('Value must not be None') + if self.type is None: + raise ValueError('Value {} is of an unsupported type.'.format(self.value)) + + def get_dict(self): + self.is_valid() + + res = {'key': self.key, + 'namespace': self.namespace, + 'type': self.type} + + if self.url is not None: + res['url'] = self.url + if self.shortValue is not None: + res['shortValue'] = self.shortValue + if self.label is not None: + res['label'] = self.label + res['value'] = self._format_value(self.value, self.type) + return res + + def __repr__(self): + return 'DisplayDataItem({})'.format(json.dumps(self.get_dict())) + + @classmethod + def _format_value(cls, value, type_): + res = value + if type_ == 'JAVA_CLASS': + res = '{}.{}'.format(value.__module__, value.__name__) + if type_ == 'DURATION': + res = value.total_seconds()*1000 + if type_ == 'TIMESTAMP': + res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000 + return res + + @classmethod + def _get_short_value(cls, value, type_): + if type_ == 'JAVA_CLASS': + return value.__name__ + return None + + @classmethod + def _get_value_type(cls, value): + type_ = cls.typeDict.get(type(value)) + if type_ is None: + type_ = 'JAVA_CLASS' if inspect.isclass(value) else None + return type_ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/__init__.py b/sdks/python/apache_beam/transforms/display/__init__.py deleted file mode 100644 index c946ac3..0000000 --- a/sdks/python/apache_beam/transforms/display/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from apache_beam.transforms.display.display_data import * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py deleted file mode 100644 index cee7e74..0000000 --- a/sdks/python/apache_beam/transforms/display/display_data.py +++ /dev/null @@ -1,151 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -DisplayData, its classes, interfaces and methods. -""" - -from __future__ import absolute_import - -import calendar -from datetime import datetime, timedelta -import inspect -import json - -__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] - - -class HasDisplayData(object): - """ Basic interface for elements that contain display data. - - It contains only the display_data method. - """ - def __init__(self, *args, **kwargs): - super(HasDisplayData, self).__init__(*args, **kwargs) - - def display_data(self): - return {} - - def _namespace(self): - return '{}.{}'.format(self.__module__, self.__class__.__name__) - - -class DisplayData(object): - def __init__(self, namespace='__main__'): - self.namespace = namespace - self.items = [] - - def populate_items(self, display_data_dict): - for key, element in display_data_dict.items(): - if isinstance(element, HasDisplayData): - subcomponent_display_data = DisplayData(element._namespace()) - subcomponent_display_data.populate_items(element.display_data()) - self.items += subcomponent_display_data.items - continue - - if isinstance(element, dict): - self.items.append( - DisplayDataItem(self.namespace, - key, - DisplayDataItem._get_value_type(element['value']), - element['value'], - shortValue=element.get('shortValue'), - url=element.get('url'), - label=element.get('label'))) - continue - - # If it's not a HasDisplayData element, - # nor a dictionary, then it's a simple value - self.items.append( - DisplayDataItem(self.namespace, - key, - DisplayDataItem._get_value_type(element), - element)) - - def output(self): - return [item.get_dict() for item in self.items] - - @classmethod - def create_from(cls, has_display_data): - if not isinstance(has_display_data, HasDisplayData): - raise ValueError('Element of class {}.{} does not subclass HasDisplayData' - .format(has_display_data.__module__, - has_display_data.__class__.__name__)) - display_data = cls(has_display_data._namespace()) - display_data.populate_items(has_display_data.display_data()) - return display_data - - -class DisplayDataItem(object): - typeDict = {str:'STRING', - int:'INTEGER', - float:'FLOAT', - timedelta:'DURATION', - datetime:'TIMESTAMP'} - - def __init__(self, namespace, key, type_, value, - shortValue=None, url=None, label=None): - if key is None: - raise ValueError('Key must not be None') - if value is None: - raise ValueError('Value must not be None') - if type_ is None: - raise ValueError('Value {} is of an unsupported type.'.format(value)) - - self.namespace = namespace - self.key = key - self.type = type_ - self.value = value - self.shortValue = shortValue - self.url = url - self.label = label - - def get_dict(self): - res = {'key': self.key, - 'namespace': self.namespace, - 'type': self.type} - - if self.url is not None: - res['url'] = self.url - # TODO: What to do about shortValue? No special processing? - if self.shortValue is not None: - res['shortValue'] = self.shortValue - if self.label is not None: - res['label'] = self.label - res['value'] = self._format_value(self.value, self.type) - return res - - def __repr__(self): - return 'DisplayDataItem({})'.format(json.dumps(self.get_dict())) - - @classmethod - def _format_value(cls, value, type_): - res = value - if type_ == 'JAVA_CLASS': - res = '{}.{}'.format(value.__module__, value.__name__) - if type_ == 'DURATION': - res = value.total_seconds()*1000 - if type_ == 'TIMESTAMP': - res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000 - return res - - @classmethod - def _get_value_type(cls, value): - type_ = cls.typeDict.get(type(value)) - if type_ is None: - type_ = 'JAVA_CLASS' if inspect.isclass(value) else None - return type_ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py deleted file mode 100644 index 345e137..0000000 --- a/sdks/python/apache_beam/transforms/display/display_data_test.py +++ /dev/null @@ -1,119 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the DisplayData API.""" - -from __future__ import absolute_import - -from datetime import datetime -import unittest - -import apache_beam as beam -from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem - - -class DisplayDataTest(unittest.TestCase): - - def test_inheritance_ptransform(self): - class MyTransform(beam.PTransform): - pass - - display_pt = MyTransform() - # PTransform inherits from HasDisplayData. - self.assertTrue(isinstance(display_pt, HasDisplayData)) - self.assertEqual(display_pt.display_data(), {}) - - def test_inheritance_dofn(self): - class MyDoFn(beam.DoFn): - pass - - display_dofn = MyDoFn() - self.assertTrue(isinstance(display_dofn, HasDisplayData)) - self.assertEqual(display_dofn.display_data(), {}) - - def test_base_cases(self): - """ Tests basic display data cases (key:value, key:dict) - It does not test subcomponent inclusion - """ - class MyDoFn(beam.DoFn): - def __init__(self, *args, **kwargs): - self.my_display_data = kwargs.get('display_data', None) - - def process(self, context): - yield context.element + 1 - - def display_data(self): - return {'static_integer': 120, - 'static_string': 'static me!', - 'complex_url': {'value': 'github.com', - 'url': 'http://github.com', - 'label': 'The URL'}, - 'python_class': HasDisplayData, - 'my_dd': self.my_display_data} - - now = datetime.now() - fn = MyDoFn(display_data=now) - dd = DisplayData.create_from(fn) - dd_dicts = sorted([item.get_dict() for item in dd.items], - key=lambda x: x['namespace']+x['key']) - - nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__) - expected_items = [ - {'url': 'http://github.com', 'namespace': nspace, - 'value': 'github.com', 'label': 'The URL', - 'key': 'complex_url', 'type': 'STRING'}, - {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd', - 'value': DisplayDataItem._format_value(now, 'TIMESTAMP')}, - {'type': 'JAVA_CLASS', 'namespace': nspace, - 'value': 'apache_beam.transforms.display.display_data.HasDisplayData', - 'key': 'python_class'}, - {'type': 'INTEGER', 'namespace': nspace, - 'value': 120, 'key': 'static_integer'}, - {'type': 'STRING', 'namespace': nspace, - 'value': 'static me!', 'key': 'static_string'}] - expected_items = sorted(expected_items, - key=lambda x: x['namespace']+x['key']) - - self.assertEqual(dd_dicts, expected_items) - - def test_subcomponent(self): - class SpecialParDo(beam.ParDo): - def __init__(self, fn): - self.fn = fn - - def display_data(self): - return {'asubcomponent': self.fn} - - class SpecialDoFn(beam.DoFn): - def display_data(self): - return {'dofn_value': 42} - - dofn = SpecialDoFn() - pardo = SpecialParDo(dofn) - dd = DisplayData.create_from(pardo) - nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__) - self.assertEqual(dd.items[0].get_dict(), - {"type": "INTEGER", - "namespace": nspace, - "value": 42, - "key": "dofn_value"}) - - -# TODO: Test __repr__ function -# TODO: Test PATH when added by swegner@ -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py new file mode 100644 index 0000000..227f3bc --- /dev/null +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the DisplayData API.""" + +from __future__ import absolute_import + +from datetime import datetime +import unittest + +import apache_beam as beam +from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem + + +class DisplayDataTest(unittest.TestCase): + + def test_inheritance_ptransform(self): + class MyTransform(beam.PTransform): + pass + + display_pt = MyTransform() + # PTransform inherits from HasDisplayData. + self.assertTrue(isinstance(display_pt, HasDisplayData)) + self.assertEqual(display_pt.display_data(), {}) + + def test_inheritance_dofn(self): + class MyDoFn(beam.DoFn): + pass + + display_dofn = MyDoFn() + self.assertTrue(isinstance(display_dofn, HasDisplayData)) + self.assertEqual(display_dofn.display_data(), {}) + + def test_base_cases(self): + """ Tests basic display data cases (key:value, key:dict) + It does not test subcomponent inclusion + """ + class MyDoFn(beam.DoFn): + def __init__(self, *args, **kwargs): + self.my_display_data = kwargs.get('display_data', None) + + def process(self, context): + yield context.element + 1 + + def display_data(self): + return {'static_integer': 120, + 'static_string': 'static me!', + 'complex_url': DisplayDataItem('github.com', + url='http://github.com', + label='The URL'), + 'python_class': HasDisplayData, + 'my_dd': self.my_display_data} + + now = datetime.now() + fn = MyDoFn(display_data=now) + dd = DisplayData.create_from(fn) + dd_dicts = sorted([item.get_dict() for item in dd.items], + key=lambda x: x['namespace']+x['key']) + + nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__) + expected_items = [ + {'url': 'http://github.com', 'namespace': nspace, + 'value': 'github.com', 'label': 'The URL', + 'key': 'complex_url', 'type': 'STRING'}, + {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd', + 'value': DisplayDataItem._format_value(now, 'TIMESTAMP')}, + {'type': 'JAVA_CLASS', 'namespace': nspace, + 'shortValue': 'HasDisplayData', 'key': 'python_class', + 'value': 'apache_beam.transforms.display.HasDisplayData'}, + {'type': 'INTEGER', 'namespace': nspace, + 'value': 120, 'key': 'static_integer'}, + {'type': 'STRING', 'namespace': nspace, + 'value': 'static me!', 'key': 'static_string'}] + expected_items = sorted(expected_items, + key=lambda x: x['namespace']+x['key']) + + self.assertEqual(dd_dicts, expected_items) + + def test_subcomponent(self): + class SpecialParDo(beam.ParDo): + def __init__(self, fn): + self.fn = fn + + def display_data(self): + return {'asubcomponent': self.fn} + + class SpecialDoFn(beam.DoFn): + def display_data(self): + return {'dofn_value': 42} + + dofn = SpecialDoFn() + pardo = SpecialParDo(dofn) + dd = DisplayData.create_from(pardo) + nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__) + self.assertEqual(dd.items[0].get_dict(), + {"type": "INTEGER", + "namespace": nspace, + "value": 42, + "key": "dofn_value"}) + + +# TODO: Test __repr__ function +# TODO: Test PATH when added by swegner@ +if __name__ == '__main__': + unittest.main()