Laying down infrastructure for static display data
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07885c86 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07885c86 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07885c86 Branch: refs/heads/python-sdk Commit: 07885c86c91a9fca65973489765bf3e9ba3ed461 Parents: 57f03f7 Author: Pablo <[email protected]> Authored: Fri Oct 14 11:44:47 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Oct 27 10:39:32 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow_runner.py | 26 ++++ sdks/python/apache_beam/runners/runner_test.py | 60 +++++++++ sdks/python/apache_beam/transforms/__init__.py | 1 + sdks/python/apache_beam/transforms/core.py | 3 +- .../apache_beam/transforms/display/__init__.py | 1 + .../transforms/display/display_data.py | 133 +++++++++++++++++++ .../transforms/display/display_data_test.py | 100 ++++++++++++++ .../python/apache_beam/transforms/ptransform.py | 3 +- sdks/python/apache_beam/utils/names.py | 1 + 9 files changed, 326 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index f794c8b..226b460 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -35,6 +35,7 @@ from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PValueCache +from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils import names from apache_beam.utils.names import PropertyNames @@ -286,6 +287,9 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform).output()) def run_CreatePCollectionView(self, transform_node): step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON, @@ -304,6 +308,9 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform_node.transform).output()) def run_Flatten(self, transform_node): step = self._add_step(TransformNames.FLATTEN, @@ -323,6 +330,9 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform_node.transform).output()) def apply_GroupByKey(self, transform, pcoll): # Infer coder of parent. @@ -364,6 +374,9 @@ class DataflowPipelineRunner(PipelineRunner): windowing = transform_node.transform.get_windowing( transform_node.inputs) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing)) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform_node.transform).output()) def run_ParDo(self, transform_node): transform = transform_node.transform @@ -422,7 +435,11 @@ class DataflowPipelineRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: ( '%s_%s' % (PropertyNames.OUT, side_tag))}) + step.add_property(PropertyNames.OUTPUT_INFO, outputs) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform).output()) @staticmethod def _pardo_fn_data(transform_node, get_label): @@ -471,6 +488,9 @@ class DataflowPipelineRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}) step.add_property(PropertyNames.OUTPUT_INFO, outputs) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform).output()) def run_Read(self, transform_node): transform = transform_node.transform @@ -545,6 +565,9 @@ class DataflowPipelineRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform).output()) def run__NativeWrite(self, transform_node): transform = transform_node.transform @@ -614,6 +637,9 @@ class DataflowPipelineRunner(PipelineRunner): {'@type': 'OutputReference', PropertyNames.STEP_NAME: input_step.proto.name, PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)}) + step.add_property( + PropertyNames.DISPLAY_DATA, + DisplayData.create_from(transform).output()) class DataflowPipelineResult(PipelineResult): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 04de7fb..1f73a36 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -22,6 +22,8 @@ the other unit tests. In this file we choose to test only aspects related to caching and clearing values that are not tested elsewhere. """ +from datetime import datetime +import json import unittest import apache_beam as beam @@ -32,6 +34,7 @@ from apache_beam.runners import create_runner from apache_beam.runners import DataflowPipelineRunner from apache_beam.runners import DirectPipelineRunner import apache_beam.transforms as ptransform +from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.options import PipelineOptions @@ -66,6 +69,63 @@ class RunnerTest(unittest.TestCase): remote_runner.job = apiclient.Job(p.options) super(DataflowPipelineRunner, remote_runner).run(p) + def test_remote_runner_display_data(self): + remote_runner = DataflowPipelineRunner() + p = Pipeline(remote_runner, + options=PipelineOptions([ + '--dataflow_endpoint=ignored', + '--job_name=test-job', + '--project=test-project', + '--staging_location=ignored', + '--temp_location=/dev/null', + '--no_auth=True' + ])) + + class SpecialParDo(beam.ParDo): + def __init__(self, fn, now, *args, **kwargs): + super(SpecialParDo, self).__init__(fn, *args, **kwargs) + self.fn = fn + self.now = now + + # Make this a list to be accessible within closure + def display_data(self): + return {'asubcomponent': self.fn, + 'a_class': SpecialParDo, + 'a_time': self.now} + + class SpecialDoFn(beam.DoFn): + def display_data(self): + return {'dofn_value': 42} + + def process(self, context): + pass + + now = datetime.now() + # pylint: disable=expression-not-assigned + (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) + | 'do' >> SpecialParDo(SpecialDoFn(), now)) + + remote_runner.job = apiclient.Job(p.options) + super(DataflowPipelineRunner, remote_runner).run(p) + job_dict = json.loads(str(remote_runner.job)) + steps = [step + for step in job_dict['steps'] + if len(step['properties'].get('display_data', [])) > 0] + step = steps[0] + disp_data = step['properties']['display_data'] + disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key']) + nspace = SpecialParDo.__module__+ '.' + expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo', + 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'), + 'key': 'a_time'}, + {'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo', + 'value': nspace+'SpecialParDo', 'key': 'a_class'}, + {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn', + 'value': 42, 'key': 'dofn_value'}] + expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key']) + self.assertEqual(len(disp_data), 3) + self.assertEqual(disp_data, expected_data) + def test_no_group_by_key_directly_after_bigquery(self): remote_runner = DataflowPipelineRunner() p = Pipeline(remote_runner, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index db8e193..3cfe60b 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -24,3 +24,4 @@ from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.timeutil import * from apache_beam.transforms.util import * +from apache_beam.transforms.display import * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index da26205..3b5816a 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -29,6 +29,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import util from apache_beam.transforms import ptransform from apache_beam.transforms import window +from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import PTransformWithSideInputs from apache_beam.transforms.window import MIN_TIMESTAMP @@ -118,7 +119,7 @@ class DoFnProcessContext(DoFnContext): self.state.counter_for(aggregator).update(input_value) -class DoFn(WithTypeHints): +class DoFn(WithTypeHints, HasDisplayData): """A function object used by a transform with custom processing. The ParDo transform is such a transform. The ParDo.apply http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/__init__.py b/sdks/python/apache_beam/transforms/display/__init__.py new file mode 100644 index 0000000..c946ac3 --- /dev/null +++ b/sdks/python/apache_beam/transforms/display/__init__.py @@ -0,0 +1 @@ +from apache_beam.transforms.display.display_data import * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/display_data.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py new file mode 100644 index 0000000..44909ad --- /dev/null +++ b/sdks/python/apache_beam/transforms/display/display_data.py @@ -0,0 +1,133 @@ +""" +DisplayData, its classes, interfaces and methods. +""" + +from __future__ import absolute_import + +import calendar +from datetime import datetime, timedelta +import inspect +import json + +__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] + +class HasDisplayData(object): + """ Basic interface for elements that contain display data. + + It contains only the display_data method. + """ + def __init__(self, *args, **kwargs): + super(HasDisplayData, self).__init__(*args, **kwargs) + + def display_data(self): + return {} + + def _namespace(self): + return '{}.{}'.format(self.__module__, self.__class__.__name__) + + +class DisplayData(object): + def __init__(self, namespace='__main__'): + self.namespace = namespace + self.items = [] + + def populate_items(self, display_data_dict): + for key, element in display_data_dict.items(): + if isinstance(element, HasDisplayData): + subcomponent_display_data = DisplayData(element._namespace()) + subcomponent_display_data.populate_items(element.display_data()) + self.items += subcomponent_display_data.items + continue + + if isinstance(element, dict): + self.items.append( + DisplayDataItem(self.namespace, + key, + DisplayDataItem._get_value_type(element['value']), + element['value'], + shortValue=element.get('shortValue'), + url=element.get('url'), + label=element.get('label'))) + continue + + # If it's not a HasDisplayData element, + # nor a dictionary, then it's a simple value + self.items.append( + DisplayDataItem(self.namespace, + key, + DisplayDataItem._get_value_type(element), + element)) + + def output(self): + return [item.get_dict() for item in self.items] + + @classmethod + def create_from(cls, has_display_data): + if not isinstance(has_display_data, HasDisplayData): + raise ValueError('Element of class {}.{} does not subclass HasDisplayData' + .format(has_display_data.__module__, + has_display_data.__class__.__name__)) + display_data = cls(has_display_data._namespace()) + display_data.populate_items(has_display_data.display_data()) + return display_data + + +class DisplayDataItem(object): + typeDict = {str:'STRING', + int:'INTEGER', + float:'FLOAT', + timedelta:'DURATION', + datetime:'TIMESTAMP'} + + def __init__(self, namespace, key, type_, value, + shortValue=None, url=None, label=None): + if key is None: + raise ValueError('Key must not be None') + if value is None: + raise ValueError('Value must not be None') + if type_ is None: + raise ValueError('Value {} is of an unsupported type.'.format(value)) + + self.namespace = namespace + self.key = key + self.type = type_ + self.value = value + self.shortValue = shortValue + self.url = url + self.label = label + + def get_dict(self): + res = {'key': self.key, + 'namespace': self.namespace, + 'type': self.type} + + if self.url is not None: + res['url'] = self.url + # TODO: What to do about shortValue? No special processing? + if self.shortValue is not None: + res['shortValue'] = self.shortValue + if self.label is not None: + res['label'] = self.label + res['value'] = self._format_value(self.value, self.type) + return res + + def __repr__(self): + return 'DisplayDataItem({})'.format(json.dumps(self.get_dict())) + + @classmethod + def _format_value(cls, value, type_): + res = value + if type_ == 'JAVA_CLASS': + res = '{}.{}'.format(value.__module__, value.__name__) + if type_ == 'DURATION': + res = value.total_seconds()*1000 + if type_ == 'TIMESTAMP': + res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000 + return res + + @classmethod + def _get_value_type(cls, value): + type_ = cls.typeDict.get(type(value)) + if type_ is None: + type_ = 'JAVA_CLASS' if inspect.isclass(value) else None + return type_ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/display_data_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py new file mode 100644 index 0000000..a107334 --- /dev/null +++ b/sdks/python/apache_beam/transforms/display/display_data_test.py @@ -0,0 +1,100 @@ +from __future__ import absolute_import + +from datetime import datetime +import unittest + +import apache_beam as beam +from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem + + +class DisplayDataTest(unittest.TestCase): + + def test_inheritance_ptransform(self): + class MyTransform(beam.PTransform): + pass + + display_pt = MyTransform() + # PTransform inherits from HasDisplayData. + self.assertTrue(isinstance(display_pt, HasDisplayData)) + self.assertEqual(display_pt.display_data(), {}) + + def test_inheritance_dofn(self): + class MyDoFn(beam.DoFn): + pass + + display_dofn = MyDoFn() + self.assertTrue(isinstance(display_dofn, HasDisplayData)) + self.assertEqual(display_dofn.display_data(), {}) + + def test_base_cases(self): + """ Tests basic display data cases (key:value, key:dict) + It does not test subcomponent inclusion + """ + class MyDoFn(beam.DoFn): + def __init__(self, *args, **kwargs): + self.my_display_data = kwargs.get('display_data', None) + + def process(self, context): + yield context.element + 1 + + def display_data(self): + return {'static_integer': 120, + 'static_string': 'static me!', + 'complex_url': {'value': 'github.com', + 'url': 'http://github.com', + 'label': 'The URL'}, + 'python_class': HasDisplayData, + 'my_dd': self.my_display_data} + + now = datetime.now() + fn = MyDoFn(display_data=now) + dd = DisplayData.create_from(fn) + dd_dicts = sorted([item.get_dict() for item in dd.items], + key=lambda x: x['namespace']+x['key']) + + nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__) + expected_items = [ + {'url': 'http://github.com', 'namespace': nspace, + 'value': 'github.com', 'label': 'The URL', + 'key': 'complex_url', 'type': 'STRING'}, + {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd', + 'value': DisplayDataItem._format_value(now, 'TIMESTAMP')}, + {'type': 'JAVA_CLASS', 'namespace': nspace, + 'value': 'apache_beam.transforms.display.display_data.HasDisplayData', + 'key': 'python_class'}, + {'type': 'INTEGER', 'namespace': nspace, + 'value': 120, 'key': 'static_integer'}, + {'type': 'STRING', 'namespace': nspace, + 'value': 'static me!', 'key': 'static_string'}] + expected_items = sorted(expected_items, + key=lambda x: x['namespace']+x['key']) + + self.assertEqual(dd_dicts, expected_items) + + def test_subcomponent(self): + class SpecialParDo(beam.ParDo): + def __init__(self, fn): + self.fn = fn + + def display_data(self): + return {'asubcomponent': self.fn} + + class SpecialDoFn(beam.DoFn): + def display_data(self): + return {'dofn_value': 42} + + dofn = SpecialDoFn() + pardo = SpecialParDo(dofn) + dd = DisplayData.create_from(pardo) + nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__) + self.assertEqual(dd.items[0].get_dict(), + {"type": "INTEGER", + "namespace": nspace, + "value": 42, + "key": "dofn_value"}) + + +# TODO: Test __repr__ function +# TODO: Test PATH when added by swegner@ +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 0713c59..0885f55 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -47,6 +47,7 @@ from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler from apache_beam.internal import util +from apache_beam.transforms.display import HasDisplayData from apache_beam.typehints import getcallargs_forhints from apache_beam.typehints import TypeCheckError from apache_beam.typehints import validate_composite_type_param @@ -167,7 +168,7 @@ class ZipPValues(_PValueishTransform): self.visit(p, sibling, pairs, context) -class PTransform(WithTypeHints): +class PTransform(WithTypeHints, HasDisplayData): """A transform object used to modify one or more PCollections. Subclasses must define an apply() method that will be used when the transform http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/utils/names.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py index 49f0402..41b5b43 100644 --- a/sdks/python/apache_beam/utils/names.py +++ b/sdks/python/apache_beam/utils/names.py @@ -49,6 +49,7 @@ class PropertyNames(object): BIGQUERY_PROJECT = 'project' BIGQUERY_SCHEMA = 'schema' BIGQUERY_WRITE_DISPOSITION = 'write_disposition' + DISPLAY_DATA = 'display_data' ELEMENT = 'element' ELEMENTS = 'elements' ENCODING = 'encoding'
