Repository: incubator-beam Updated Branches: refs/heads/python-sdk 384fb5dc1 -> d1fccbf5e
Display Data for: PipelineOptions, combiners, more sources Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d805eec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d805eec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d805eec Branch: refs/heads/python-sdk Commit: 9d805eec6b9cedb43b6e79e255483fc8fa6832d1 Parents: 384fb5d Author: Pablo <pabl...@google.com> Authored: Wed Nov 9 14:03:03 2016 -0800 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Nov 15 11:02:28 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/apiclient.py | 18 +++-- .../apache_beam/internal/json_value_test.py | 8 +- sdks/python/apache_beam/io/avroio.py | 20 ++++- sdks/python/apache_beam/io/avroio_test.py | 78 ++++++++++++++++++++ sdks/python/apache_beam/io/fileio.py | 20 ++++- sdks/python/apache_beam/io/fileio_test.py | 40 ++++++++-- sdks/python/apache_beam/io/iobase.py | 5 +- sdks/python/apache_beam/io/textio.py | 25 +++++-- sdks/python/apache_beam/io/textio_test.py | 28 +++++++ sdks/python/apache_beam/pipeline_test.py | 12 +-- sdks/python/apache_beam/transforms/combiners.py | 14 ++++ .../apache_beam/transforms/combiners_test.py | 63 ++++++++++++++++ sdks/python/apache_beam/transforms/core.py | 16 +++- .../python/apache_beam/transforms/ptransform.py | 9 +++ sdks/python/apache_beam/utils/options.py | 17 ++++- .../apache_beam/utils/pipeline_options_test.py | 46 ++++++++++-- sdks/python/setup.py | 3 + 17 files changed, 375 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5ac9d6e..8992ec3 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -32,6 +32,7 @@ from apache_beam import utils from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.json_value import to_json_value from apache_beam.transforms import cy_combiners +from apache_beam.transforms.display import DisplayData from apache_beam.utils import dependency from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version @@ -234,11 +235,18 @@ class Environment(object): self.proto.sdkPipelineOptions = ( dataflow.Environment.SdkPipelineOptionsValue()) - for k, v in sdk_pipeline_options.iteritems(): - if v is not None: - self.proto.sdkPipelineOptions.additionalProperties.append( - dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( - key=k, value=to_json_value(v))) + options_dict = {k: v + for k, v in sdk_pipeline_options.iteritems() + if v is not None} + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='options', value=to_json_value(options_dict))) + + dd = DisplayData.create_from(options) + items = [item.get_dict() for item in dd.items] + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='display_data', value=to_json_value(items))) class Job(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/json_value_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py index cfab293..a4a47b8 100644 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ b/sdks/python/apache_beam/internal/json_value_test.py @@ -76,14 +76,8 @@ class JsonValueTest(unittest.TestCase): self.assertEquals(long(27), from_json_value(to_json_value(long(27)))) def test_too_long_value(self): - try: + with self.assertRaises(TypeError): to_json_value(long(1 << 64)) - except TypeError as e: - pass - except Exception as e: - self.fail('Unexpected exception raised: {}'.format(e)) - else: - self.fail('TypeError not raised.') if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index e7e73dd..6cba12d 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -76,11 +76,13 @@ class ReadFromAvro(PTransform): """ super(ReadFromAvro, self).__init__() self._args = (file_pattern, min_bundle_size) - self._validate = validate + self._source = _AvroSource(*self._args, validate=validate) def apply(self, pvalue): - return pvalue.pipeline | Read(_AvroSource(*self._args, - validate=self._validate)) + return pvalue.pipeline | Read(self._source) + + def display_data(self): + return {'source_dd': self._source} class _AvroUtils(object): @@ -292,9 +294,13 @@ class WriteToAvro(beam.transforms.PTransform): """ self._args = (file_path_prefix, schema, codec, file_name_suffix, num_shards, shard_name_template, mime_type) + self._sink = _AvroSink(*self._args) def apply(self, pcoll): - return pcoll | beam.io.iobase.Write(_AvroSink(*self._args)) + return pcoll | beam.io.iobase.Write(self._sink) + + def display_data(self): + return {'sink_dd': self._sink} class _AvroSink(fileio.FileSink): @@ -328,3 +334,9 @@ class _AvroSink(fileio.FileSink): def write_record(self, writer, value): writer.append(value) + + def display_data(self): + res = super(self.__class__, self).display_data() + res['codec'] = str(self._codec) + res['schema'] = str(self._schema) + return res http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 9e356ca..e8fb12b 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -21,15 +21,20 @@ import os import tempfile import unittest +import hamcrest as hc + import apache_beam as beam from apache_beam.io import avroio from apache_beam.io import filebasedsource from apache_beam.io import source_test_utils +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to # Importing following private class for testing purposes. from apache_beam.io.avroio import _AvroSource as AvroSource +from apache_beam.io.avroio import _AvroSink as AvroSink import avro.datafile from avro.datafile import DataFileWriter @@ -144,6 +149,79 @@ class TestAvro(unittest.TestCase): expected_result = self.RECORDS self._run_avro_test(file_name, 100, True, expected_result) + def test_source_display_data(self): + file_name = 'some_avro_source' + source = AvroSource(file_name, validate=False) + dd = DisplayData.create_from(source) + + # No extra avro parameters for AvroSource. + expected_items = [ + DisplayDataItemMatcher('compression', 'auto'), + DisplayDataItemMatcher('filePattern', file_name)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_read_display_data(self): + file_name = 'some_avro_source' + read = avroio.ReadFromAvro(file_name, validate=False) + dd = DisplayData.create_from(read) + + # No extra avro parameters for AvroSource. + expected_items = [ + DisplayDataItemMatcher('compression', 'auto'), + DisplayDataItemMatcher('filePattern', file_name)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_sink_display_data(self): + file_name = 'some_avro_sink' + sink = AvroSink(file_name, + self.SCHEMA, + 'null', + '.end', + 0, + None, + 'application/x-avro') + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'schema', + str(self.SCHEMA)), + DisplayDataItemMatcher( + 'filePattern', + 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'), + DisplayDataItemMatcher( + 'shards', + 0), + DisplayDataItemMatcher( + 'codec', + 'null'), + DisplayDataItemMatcher( + 'compression', + 'uncompressed')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_write_display_data(self): + file_name = 'some_avro_sink' + write = avroio.WriteToAvro(file_name, + self.SCHEMA) + dd = DisplayData.create_from(write) + expected_items = [ + DisplayDataItemMatcher( + 'schema', + str(self.SCHEMA)), + DisplayDataItemMatcher( + 'filePattern', + 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'), + DisplayDataItemMatcher( + 'shards', + 0), + DisplayDataItemMatcher( + 'codec', + 'deflate'), + DisplayDataItemMatcher( + 'compression', + 'uncompressed')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_read_reentrant_without_splitting(self): file_name = self._write_data() source = AvroSource(file_name) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index ef20a7c..3b67c4f 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -169,7 +169,9 @@ class NativeFileSource(dataflow_io.NativeSource): def display_data(self): return {'filePattern': DisplayDataItem(self.file_path, - label="File Pattern")} + label="File Pattern"), + 'compression': DisplayDataItem(str(self.compression_type), + label='Compression')} def __eq__(self, other): return (self.file_path == other.file_path and @@ -799,6 +801,17 @@ class FileSink(iobase.Sink): self.compression_type = compression_type self.mime_type = mime_type + def display_data(self): + return {'shards': + DisplayDataItem(self.num_shards, label='Number of Shards'), + 'compression': + DisplayDataItem(str(self.compression_type)), + 'filePattern': + DisplayDataItem('{}{}{}'.format(self.file_path_prefix, + self.shard_name_format, + self.file_name_suffix), + label='File Pattern')} + def open(self, temp_path): """Opens ``temp_path``, returning an opaque file handle object. @@ -1071,7 +1084,10 @@ class NativeFileSink(dataflow_io.NativeSink): file_name_pattern = '{}{}{}'.format(self.file_name_prefix, self.shard_name_template, self.file_name_suffix) - return {'filePattern': + return {'shards': + DisplayDataItem(self.num_shards, + label='Number of Shards'), + 'filePattern': DisplayDataItem(file_name_pattern, label='File Name Pattern'), 'compression': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index b55fa19..63e71e0 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -68,7 +68,8 @@ class TestTextFileSource(unittest.TestCase): self.assertEqual(read_lines, output_lines) dd = DisplayData.create_from(source) expected_items = [ - DisplayDataItemMatcher('filePattern', file_name)] + DisplayDataItemMatcher('filePattern', file_name), + DisplayDataItemMatcher('compression', 'auto')] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) @@ -611,7 +612,10 @@ class TestNativeTextFileSink(unittest.TestCase): '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), DisplayDataItemMatcher( 'compression', - 'auto')] + 'auto'), + DisplayDataItemMatcher( + 'shards', + 0)] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_text_file_display_data_suffix(self): @@ -623,7 +627,10 @@ class TestNativeTextFileSink(unittest.TestCase): '{}{}{}'.format(self.path, '-SSSSS-of-NNNNN', '.pdf')), DisplayDataItemMatcher( 'compression', - 'auto')] + 'auto'), + DisplayDataItemMatcher( + 'shards', + 0)] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_write_text_file_empty(self): @@ -651,7 +658,10 @@ class TestNativeTextFileSink(unittest.TestCase): '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), DisplayDataItemMatcher( 'compression', - 'gzip')] + 'gzip'), + DisplayDataItemMatcher( + 'shards', + 0)] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_write_text_gzip_file_auto(self): @@ -688,7 +698,10 @@ class TestNativeTextFileSink(unittest.TestCase): '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), DisplayDataItemMatcher( 'compression', - 'bzip2')] + 'bzip2'), + DisplayDataItemMatcher( + 'shards', + 0)] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_write_text_bzip2_file_auto(self): @@ -764,6 +777,23 @@ class TestFileSink(unittest.TestCase): # Check that any temp files are deleted. self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*')) + def test_file_sink_display_data(self): + temp_path = tempfile.NamedTemporaryFile().name + sink = MyFileSink( + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'shards', 0), + DisplayDataItemMatcher( + 'compression', 'auto'), + DisplayDataItemMatcher( + 'filePattern', + '{}{}'.format(temp_path, + '-%(shard_num)05d-of-%(num_shards)05d.foo'))] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_empty_write(self): temp_path = tempfile.NamedTemporaryFile().name sink = MyFileSink( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index a0de131..b7cac3e 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -42,7 +42,8 @@ from apache_beam.pvalue import AsSingleton from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window -from apache_beam.transforms.display import HasDisplayData, DisplayDataItem +from apache_beam.transforms.display import HasDisplayData +from apache_beam.transforms.display import DisplayDataItem # Encapsulates information about a bundle of a source generated when method @@ -414,7 +415,7 @@ class RangeTracker(object): raise NotImplementedError -class Sink(object): +class Sink(HasDisplayData): """A resource that can be written to using the ``df.io.Write`` transform. Here ``df`` stands for Dataflow Python code imported in following manner. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index e031572..4e94f87 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -25,6 +25,7 @@ from apache_beam.io import fileio from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.transforms import PTransform +from apache_beam.transforms.display import DisplayDataItem __all__ = ['ReadFromText', 'WriteToText'] @@ -237,13 +238,21 @@ class ReadFromText(PTransform): """ super(ReadFromText, self).__init__(**kwargs) - self._args = (file_pattern, min_bundle_size, compression_type, - strip_trailing_newlines, coder) - self._validate = validate + self._file_pattern = file_pattern + self._min_bundle_size = min_bundle_size + self._compression_type = compression_type + self._strip_trailing_newlines = strip_trailing_newlines + self._coder = coder + self._source = _TextSource(file_pattern, min_bundle_size, compression_type, + strip_trailing_newlines, coder, validate=validate) def apply(self, pvalue): - return pvalue.pipeline | Read(_TextSource(*self._args, - validate=self._validate)) + return pvalue.pipeline | Read(self._source) + + def display_data(self): + return {'source_dd': self._source, + 'strip_nwln': DisplayDataItem(self._strip_trailing_newlines, + label='Strip Trailing New Lines')} class WriteToText(PTransform): @@ -292,6 +301,10 @@ class WriteToText(PTransform): self._args = (file_path_prefix, file_name_suffix, append_trailing_newlines, num_shards, shard_name_template, coder, compression_type) + self._sink = _TextSink(*self._args) def apply(self, pcoll): - return pcoll | Write(_TextSink(*self._args)) + return pcoll | Write(self._sink) + + def display_data(self): + return {'sink_dd': self._sink} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 81d04ab..acdac47 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -25,6 +25,8 @@ import os import tempfile import unittest +import hamcrest as hc + import apache_beam as beam import apache_beam.io.source_test_utils as source_test_utils @@ -41,6 +43,9 @@ from apache_beam.io.filebasedsource_test import write_data from apache_beam.io.filebasedsource_test import write_pattern from apache_beam.io.fileio import CompressionTypes +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -247,6 +252,15 @@ class TextSourceTest(unittest.TestCase): splits[0].source, splits[0].start_position, splits[0].stop_position, perform_multi_threaded_test=False) + def test_read_display_data(self): + read = ReadFromText('prefix', validate=False) + dd = DisplayData.create_from(read) + expected_items = [ + DisplayDataItemMatcher('compression', 'auto'), + DisplayDataItemMatcher('filePattern', 'prefix'), + DisplayDataItemMatcher('strip_nwln', True)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_dataflow_single_file(self): file_name, expected_data = write_data(5) assert len(expected_data) == 5 @@ -450,6 +464,20 @@ class TextSinkTest(unittest.TestCase): with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) + def test_write_display_data(self): + write = WriteToText('prefix') + dd = DisplayData.create_from(write) + expected_items = [ + DisplayDataItemMatcher( + 'compression', 'auto'), + DisplayDataItemMatcher( + 'shards', 0), + DisplayDataItemMatcher( + 'filePattern', + '{}{}'.format('prefix', + '-%(shard_num)05d-of-%(num_shards)05d'))] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_write_dataflow(self): pipeline = beam.Pipeline('DirectPipelineRunner') pcoll = pipeline | beam.core.Create('Create', self.lines) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 013796c..c50f04d 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -295,12 +295,14 @@ class PipelineOptionsTest(unittest.TestCase): def test_dir(self): options = Breakfast() self.assertEquals( - ['from_dictionary', 'get_all_options', 'slices', 'style', 'view_as'], - [attr for attr in dir(options) if not attr.startswith('_')]) + set(['from_dictionary', 'get_all_options', 'slices', 'style', + 'view_as', 'display_data']), + set([attr for attr in dir(options) if not attr.startswith('_')])) self.assertEquals( - ['from_dictionary', 'get_all_options', 'style', 'view_as'], - [attr for attr in dir(options.view_as(Eggs)) - if not attr.startswith('_')]) + set(['from_dictionary', 'get_all_options', 'style', 'view_as', + 'display_data']), + set([attr for attr in dir(options.view_as(Eggs)) + if not attr.startswith('_')])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index a0604b8..22d2b3e 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -25,6 +25,7 @@ import random from apache_beam.transforms import core from apache_beam.transforms import cy_combiners from apache_beam.transforms import ptransform +from apache_beam.transforms.display import DisplayDataItem from apache_beam.typehints import Any from apache_beam.typehints import Dict from apache_beam.typehints import KV @@ -282,6 +283,13 @@ class TopCombineFn(core.CombineFn): buffer.sort(cmp=lambda a, b: (not lt(a, b)) - (not lt(b, a)), key=self._key_fn) + def display_data(self): + return {'n': self._n, + 'compare': DisplayDataItem(self._compare.__name__ + if hasattr(self._compare, '__name__') + else self._compare.__class__.__name__) + .drop_if_none()} + # The accumulator type is a tuple (threshold, buffer), where threshold # is the smallest element [key] that could possibly be in the top n based # on the elements observed so far, and buffer is a (periodically sorted) @@ -413,6 +421,12 @@ class _TupleCombineFnBase(core.CombineFn): def __init__(self, *combiners): self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners] + self._named_combiners = combiners + + def display_data(self): + combiners = [c.__name__ if hasattr(c, '__name__') else c.__class__.__name__ + for c in self._named_combiners] + return {'combiners': str(combiners)} def create_accumulator(self): return [c.create_accumulator() for c in self._combiners] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 2858d0d..d28c63f 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -19,12 +19,16 @@ import unittest +import hamcrest as hc + import apache_beam as beam from apache_beam.pipeline import Pipeline import apache_beam.transforms.combiners as combine from apache_beam.transforms.core import CombineGlobally from apache_beam.transforms.core import Create from apache_beam.transforms.core import Map +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import assert_that, equal_to @@ -137,6 +141,65 @@ class CombineTest(unittest.TestCase): [range(1000), range(100), range(1001)], [1000, 999, 999, 998, 998]) + def test_combine_per_key_top_display_data(self): + def individual_test_per_key_dd(combineFn): + transform = beam.CombinePerKey(combineFn) + dd = DisplayData.create_from(transform) + expected_items = [ + DisplayDataItemMatcher('combineFn', combineFn.__class__), + DisplayDataItemMatcher('n', combineFn._n), + DisplayDataItemMatcher('compare', combineFn._compare.__name__)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + individual_test_per_key_dd(combine.Largest(5)) + individual_test_per_key_dd(combine.Smallest(3)) + individual_test_per_key_dd(combine.TopCombineFn(8)) + individual_test_per_key_dd(combine.Largest(5)) + + def test_combine_sample_display_data(self): + def individual_test_per_key_dd(sampleFn, args, kwargs): + trs = [beam.CombinePerKey(sampleFn(*args, **kwargs)), + beam.CombineGlobally(sampleFn(*args, **kwargs))] + for transform in trs: + dd = DisplayData.create_from(transform) + expected_items = [ + DisplayDataItemMatcher('fn', sampleFn.fn.__name__), + DisplayDataItemMatcher('combineFn', + transform.fn.__class__)] + if len(args) > 0: + expected_items.append( + DisplayDataItemMatcher('args', str(args))) + if len(kwargs) > 0: + expected_items.append( + DisplayDataItemMatcher('kwargs', str(kwargs))) + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + individual_test_per_key_dd(combine.Sample.FixedSizePerKey, + args=(5,), + kwargs={}) + individual_test_per_key_dd(combine.Sample.FixedSizeGlobally, + args=(8,), + kwargs={'arg': 9}) + + def test_combine_globally_display_data(self): + transform = beam.CombineGlobally(combine.Smallest(5)) + dd = DisplayData.create_from(transform) + expected_items = [ + DisplayDataItemMatcher('combineFn', combine.Smallest), + DisplayDataItemMatcher('n', 5), + DisplayDataItemMatcher('compare', 'gt')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_basic_combiners_display_data(self): + transform = beam.CombineGlobally( + combine.TupleCombineFn(max, combine.MeanCombineFn(), sum)) + dd = DisplayData.create_from(transform) + expected_items = [ + DisplayDataItemMatcher('combineFn', combine.TupleCombineFn), + DisplayDataItemMatcher('combiners', + "['max', 'MeanCombineFn', 'sum']")] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_top_shorthands(self): pipeline = Pipeline('DirectPipelineRunner') http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3189de7..ffcdd10 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -269,7 +269,7 @@ class CallableWrapperDoFn(DoFn): return getattr(self._fn, '_argspec_fn', self._fn) -class CombineFn(WithTypeHints): +class CombineFn(WithTypeHints, HasDisplayData): """A function object used by a Combine transform with custom processing. A CombineFn specifies how multiple values in all or part of a PCollection can @@ -415,6 +415,9 @@ class CallableWrapperCombineFn(CombineFn): super(CallableWrapperCombineFn, self).__init__() self._fn = fn + def display_data(self): + return {'fn_dd': self._fn} + def __repr__(self): return "CallableWrapperCombineFn(%s)" % self._fn @@ -828,6 +831,12 @@ class CombineGlobally(PTransform): self.args = args self.kwargs = kwargs + def display_data(self): + return {'combineFn': + DisplayDataItem(self.fn.__class__, label='Combine Function'), + 'combineFn_dd': + self.fn} + def default_label(self): return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn) @@ -914,6 +923,11 @@ class CombinePerKey(PTransformWithSideInputs): Returns: A PObject holding the result of the combine operation. """ + def display_data(self): + return {'combineFn': + DisplayDataItem(self.fn.__class__, label='Combine Function'), + 'combineFn_dd': + self.fn} def make_fn(self, fn): self._fn_label = ptransform.label_from_callable(fn) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 0885f55..2212d00 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -48,6 +48,7 @@ from apache_beam import typehints from apache_beam.internal import pickler from apache_beam.internal import util from apache_beam.transforms.display import HasDisplayData +from apache_beam.transforms.display import DisplayDataItem from apache_beam.typehints import getcallargs_forhints from apache_beam.typehints import TypeCheckError from apache_beam.typehints import validate_composite_type_param @@ -629,6 +630,14 @@ class CallablePTransform(PTransform): self._args = () self._kwargs = {} + def display_data(self): + res = {'fn': (self.fn.__name__ + if hasattr(self.fn, '__name__') + else self.fn.__class__), + 'args': DisplayDataItem(str(self._args)).drop_if_default('()'), + 'kwargs': DisplayDataItem(str(self._kwargs)).drop_if_default('{}')} + return res + def __call__(self, *args, **kwargs): if args and args[0] is None: label, self._args = None, args[1:] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/utils/options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py index f68335b..aacb186 100644 --- a/sdks/python/apache_beam/utils/options.py +++ b/sdks/python/apache_beam/utils/options.py @@ -22,8 +22,10 @@ TODO(silviuc): Should rename this module to pipeline_options. import argparse +from apache_beam.transforms.display import HasDisplayData -class PipelineOptions(object): + +class PipelineOptions(HasDisplayData): """Pipeline options class used as container for command line options. The class is essentially a wrapper over the standard argparse Python module @@ -104,12 +106,16 @@ class PipelineOptions(object): return cls(flags) - def get_all_options(self): + def get_all_options(self, drop_default=False): """Returns a dictionary of all defined arguments. Returns a dictionary of all defined arguments (arguments that are defined in any subclass of PipelineOptions) into a dictionary. + Args: + drop_default: If set to true, options that are equal to their default + values, are not returned as part of the result dictionary. + Returns: Dictionary of all args and values. """ @@ -120,12 +126,17 @@ class PipelineOptions(object): result = vars(known_args) # Apply the overrides if any - for k in result: + for k in result.keys(): if k in self._all_options: result[k] = self._all_options[k] + if drop_default and parser.get_default(k) == result[k]: + del result[k] return result + def display_data(self): + return self.get_all_options(True) + def view_as(self, cls): view = cls(self._flags) view._all_options = self._all_options http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/utils/pipeline_options_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index 3b70e1e..ed55362 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -20,6 +20,9 @@ import logging import unittest +import hamcrest as hc +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.options import PipelineOptions @@ -27,25 +30,48 @@ class PipelineOptionsTest(unittest.TestCase): TEST_CASES = [ {'flags': ['--num_workers', '5'], - 'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}}, + 'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}, + 'display_data': [DisplayDataItemMatcher('num_workers', 5)]}, { 'flags': [ '--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'], 'expected': { 'profile_cpu': True, 'profile_location': 'gs://bucket/', - 'mock_flag': False, 'mock_option': None} + 'mock_flag': False, 'mock_option': None}, + 'display_data': [ + DisplayDataItemMatcher('profile_cpu', + True), + DisplayDataItemMatcher('profile_location', + 'gs://bucket/')] }, {'flags': ['--num_workers', '5', '--mock_flag'], - 'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}}, + 'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}, + 'display_data': [ + DisplayDataItemMatcher('num_workers', 5), + DisplayDataItemMatcher('mock_flag', True)] + }, {'flags': ['--mock_option', 'abc'], - 'expected': {'mock_flag': False, 'mock_option': 'abc'}}, + 'expected': {'mock_flag': False, 'mock_option': 'abc'}, + 'display_data': [ + DisplayDataItemMatcher('mock_option', 'abc')] + }, {'flags': ['--mock_option', ' abc def '], - 'expected': {'mock_flag': False, 'mock_option': ' abc def '}}, + 'expected': {'mock_flag': False, 'mock_option': ' abc def '}, + 'display_data': [ + DisplayDataItemMatcher('mock_option', ' abc def ')] + }, {'flags': ['--mock_option= abc xyz '], - 'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}}, + 'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}, + 'display_data': [ + DisplayDataItemMatcher('mock_option', ' abc xyz ')] + }, {'flags': ['--mock_option=gs://my bucket/my folder/my file'], 'expected': {'mock_flag': False, - 'mock_option': 'gs://my bucket/my folder/my file'}}, + 'mock_option': 'gs://my bucket/my folder/my file'}, + 'display_data': [ + DisplayDataItemMatcher( + 'mock_option', 'gs://my bucket/my folder/my file')] + }, ] # Used for testing newly added flags. @@ -57,6 +83,12 @@ class PipelineOptionsTest(unittest.TestCase): parser.add_argument('--mock_option', help='mock option') parser.add_argument('--option with space', help='mock option with space') + def test_display_data(self): + for case in PipelineOptionsTest.TEST_CASES: + options = PipelineOptions(flags=case['flags']) + dd = DisplayData.create_from(options) + hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data'])) + def test_get_all_options(self): for case in PipelineOptionsTest.TEST_CASES: options = PipelineOptions(flags=case['flags']) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 4010b06..c7b940d 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -95,6 +95,9 @@ REQUIRED_PACKAGES = [ 'python-gflags>=2.0,<4.0.0', 'pyyaml>=3.10,<4.0.0', ] +REQUIRED_TEST_PACKAGES = [ + 'pyhamcrest>=1.9,<2.0', + ] REQUIRED_TEST_PACKAGES = [