Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 384fb5dc1 -> d1fccbf5e


Display Data for: PipelineOptions, combiners, more sources


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d805eec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d805eec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d805eec

Branch: refs/heads/python-sdk
Commit: 9d805eec6b9cedb43b6e79e255483fc8fa6832d1
Parents: 384fb5d
Author: Pablo <pabl...@google.com>
Authored: Wed Nov 9 14:03:03 2016 -0800
Committer: Robert Bradshaw <rober...@google.com>
Committed: Tue Nov 15 11:02:28 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 18 +++--
 .../apache_beam/internal/json_value_test.py     |  8 +-
 sdks/python/apache_beam/io/avroio.py            | 20 ++++-
 sdks/python/apache_beam/io/avroio_test.py       | 78 ++++++++++++++++++++
 sdks/python/apache_beam/io/fileio.py            | 20 ++++-
 sdks/python/apache_beam/io/fileio_test.py       | 40 ++++++++--
 sdks/python/apache_beam/io/iobase.py            |  5 +-
 sdks/python/apache_beam/io/textio.py            | 25 +++++--
 sdks/python/apache_beam/io/textio_test.py       | 28 +++++++
 sdks/python/apache_beam/pipeline_test.py        | 12 +--
 sdks/python/apache_beam/transforms/combiners.py | 14 ++++
 .../apache_beam/transforms/combiners_test.py    | 63 ++++++++++++++++
 sdks/python/apache_beam/transforms/core.py      | 16 +++-
 .../python/apache_beam/transforms/ptransform.py |  9 +++
 sdks/python/apache_beam/utils/options.py        | 17 ++++-
 .../apache_beam/utils/pipeline_options_test.py  | 46 ++++++++++--
 sdks/python/setup.py                            |  3 +
 17 files changed, 375 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 5ac9d6e..8992ec3 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -32,6 +32,7 @@ from apache_beam import utils
 from apache_beam.internal.auth import get_service_credentials
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.transforms import cy_combiners
+from apache_beam.transforms.display import DisplayData
 from apache_beam.utils import dependency
 from apache_beam.utils import retry
 from apache_beam.utils.dependency import get_required_container_version
@@ -234,11 +235,18 @@ class Environment(object):
       self.proto.sdkPipelineOptions = (
           dataflow.Environment.SdkPipelineOptionsValue())
 
-      for k, v in sdk_pipeline_options.iteritems():
-        if v is not None:
-          self.proto.sdkPipelineOptions.additionalProperties.append(
-              dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-                  key=k, value=to_json_value(v)))
+      options_dict = {k: v
+                      for k, v in sdk_pipeline_options.iteritems()
+                      if v is not None}
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='options', value=to_json_value(options_dict)))
+
+      dd = DisplayData.create_from(options)
+      items = [item.get_dict() for item in dd.items]
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='display_data', value=to_json_value(items)))
 
 
 class Job(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value_test.py 
b/sdks/python/apache_beam/internal/json_value_test.py
index cfab293..a4a47b8 100644
--- a/sdks/python/apache_beam/internal/json_value_test.py
+++ b/sdks/python/apache_beam/internal/json_value_test.py
@@ -76,14 +76,8 @@ class JsonValueTest(unittest.TestCase):
     self.assertEquals(long(27), from_json_value(to_json_value(long(27))))
 
   def test_too_long_value(self):
-    try:
+    with self.assertRaises(TypeError):
       to_json_value(long(1 << 64))
-    except TypeError as e:
-      pass
-    except Exception as e:
-      self.fail('Unexpected exception raised: {}'.format(e))
-    else:
-      self.fail('TypeError not raised.')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index e7e73dd..6cba12d 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -76,11 +76,13 @@ class ReadFromAvro(PTransform):
     """
     super(ReadFromAvro, self).__init__()
     self._args = (file_pattern, min_bundle_size)
-    self._validate = validate
+    self._source = _AvroSource(*self._args, validate=validate)
 
   def apply(self, pvalue):
-    return pvalue.pipeline | Read(_AvroSource(*self._args,
-                                              validate=self._validate))
+    return pvalue.pipeline | Read(self._source)
+
+  def display_data(self):
+    return {'source_dd': self._source}
 
 
 class _AvroUtils(object):
@@ -292,9 +294,13 @@ class WriteToAvro(beam.transforms.PTransform):
     """
     self._args = (file_path_prefix, schema, codec, file_name_suffix, 
num_shards,
                   shard_name_template, mime_type)
+    self._sink = _AvroSink(*self._args)
 
   def apply(self, pcoll):
-    return pcoll | beam.io.iobase.Write(_AvroSink(*self._args))
+    return pcoll | beam.io.iobase.Write(self._sink)
+
+  def display_data(self):
+    return {'sink_dd': self._sink}
 
 
 class _AvroSink(fileio.FileSink):
@@ -328,3 +334,9 @@ class _AvroSink(fileio.FileSink):
 
   def write_record(self, writer, value):
     writer.append(value)
+
+  def display_data(self):
+    res = super(self.__class__, self).display_data()
+    res['codec'] = str(self._codec)
+    res['schema'] = str(self._schema)
+    return res

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index 9e356ca..e8fb12b 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -21,15 +21,20 @@ import os
 import tempfile
 import unittest
 
+import hamcrest as hc
+
 import apache_beam as beam
 from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
 # Importing following private class for testing purposes.
 from apache_beam.io.avroio import _AvroSource as AvroSource
+from apache_beam.io.avroio import _AvroSink as AvroSink
 
 import avro.datafile
 from avro.datafile import DataFileWriter
@@ -144,6 +149,79 @@ class TestAvro(unittest.TestCase):
     expected_result = self.RECORDS
     self._run_avro_test(file_name, 100, True, expected_result)
 
+  def test_source_display_data(self):
+    file_name = 'some_avro_source'
+    source = AvroSource(file_name, validate=False)
+    dd = DisplayData.create_from(source)
+
+    # No extra avro parameters for AvroSource.
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('filePattern', file_name)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_read_display_data(self):
+    file_name = 'some_avro_source'
+    read = avroio.ReadFromAvro(file_name, validate=False)
+    dd = DisplayData.create_from(read)
+
+    # No extra avro parameters for AvroSource.
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('filePattern', file_name)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_sink_display_data(self):
+    file_name = 'some_avro_sink'
+    sink = AvroSink(file_name,
+                    self.SCHEMA,
+                    'null',
+                    '.end',
+                    0,
+                    None,
+                    'application/x-avro')
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'schema',
+            str(self.SCHEMA)),
+        DisplayDataItemMatcher(
+            'filePattern',
+            'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
+        DisplayDataItemMatcher(
+            'shards',
+            0),
+        DisplayDataItemMatcher(
+            'codec',
+            'null'),
+        DisplayDataItemMatcher(
+            'compression',
+            'uncompressed')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_write_display_data(self):
+    file_name = 'some_avro_sink'
+    write = avroio.WriteToAvro(file_name,
+                               self.SCHEMA)
+    dd = DisplayData.create_from(write)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'schema',
+            str(self.SCHEMA)),
+        DisplayDataItemMatcher(
+            'filePattern',
+            'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'),
+        DisplayDataItemMatcher(
+            'shards',
+            0),
+        DisplayDataItemMatcher(
+            'codec',
+            'deflate'),
+        DisplayDataItemMatcher(
+            'compression',
+            'uncompressed')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_read_reentrant_without_splitting(self):
     file_name = self._write_data()
     source = AvroSource(file_name)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index ef20a7c..3b67c4f 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -169,7 +169,9 @@ class NativeFileSource(dataflow_io.NativeSource):
 
   def display_data(self):
     return {'filePattern': DisplayDataItem(self.file_path,
-                                           label="File Pattern")}
+                                           label="File Pattern"),
+            'compression': DisplayDataItem(str(self.compression_type),
+                                           label='Compression')}
 
   def __eq__(self, other):
     return (self.file_path == other.file_path and
@@ -799,6 +801,17 @@ class FileSink(iobase.Sink):
     self.compression_type = compression_type
     self.mime_type = mime_type
 
+  def display_data(self):
+    return {'shards':
+            DisplayDataItem(self.num_shards, label='Number of Shards'),
+            'compression':
+            DisplayDataItem(str(self.compression_type)),
+            'filePattern':
+            DisplayDataItem('{}{}{}'.format(self.file_path_prefix,
+                                            self.shard_name_format,
+                                            self.file_name_suffix),
+                            label='File Pattern')}
+
   def open(self, temp_path):
     """Opens ``temp_path``, returning an opaque file handle object.
 
@@ -1071,7 +1084,10 @@ class NativeFileSink(dataflow_io.NativeSink):
     file_name_pattern = '{}{}{}'.format(self.file_name_prefix,
                                         self.shard_name_template,
                                         self.file_name_suffix)
-    return {'filePattern':
+    return {'shards':
+            DisplayDataItem(self.num_shards,
+                            label='Number of Shards'),
+            'filePattern':
             DisplayDataItem(file_name_pattern,
                             label='File Name Pattern'),
             'compression':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index b55fa19..63e71e0 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -68,7 +68,8 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(read_lines, output_lines)
     dd = DisplayData.create_from(source)
     expected_items = [
-        DisplayDataItemMatcher('filePattern', file_name)]
+        DisplayDataItemMatcher('filePattern', file_name),
+        DisplayDataItemMatcher('compression', 'auto')]
     hc.assert_that(dd.items,
                    hc.contains_inanyorder(*expected_items))
 
@@ -611,7 +612,10 @@ class TestNativeTextFileSink(unittest.TestCase):
             '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
         DisplayDataItemMatcher(
             'compression',
-            'auto')]
+            'auto'),
+        DisplayDataItemMatcher(
+            'shards',
+            0)]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_text_file_display_data_suffix(self):
@@ -623,7 +627,10 @@ class TestNativeTextFileSink(unittest.TestCase):
             '{}{}{}'.format(self.path, '-SSSSS-of-NNNNN', '.pdf')),
         DisplayDataItemMatcher(
             'compression',
-            'auto')]
+            'auto'),
+        DisplayDataItemMatcher(
+            'shards',
+            0)]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_write_text_file_empty(self):
@@ -651,7 +658,10 @@ class TestNativeTextFileSink(unittest.TestCase):
             '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
         DisplayDataItemMatcher(
             'compression',
-            'gzip')]
+            'gzip'),
+        DisplayDataItemMatcher(
+            'shards',
+            0)]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_write_text_gzip_file_auto(self):
@@ -688,7 +698,10 @@ class TestNativeTextFileSink(unittest.TestCase):
             '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
         DisplayDataItemMatcher(
             'compression',
-            'bzip2')]
+            'bzip2'),
+        DisplayDataItemMatcher(
+            'shards',
+            0)]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_write_text_bzip2_file_auto(self):
@@ -764,6 +777,23 @@ class TestFileSink(unittest.TestCase):
     # Check that any temp files are deleted.
     self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*'))
 
+  def test_file_sink_display_data(self):
+    temp_path = tempfile.NamedTemporaryFile().name
+    sink = MyFileSink(
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'shards', 0),
+        DisplayDataItemMatcher(
+            'compression', 'auto'),
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}'.format(temp_path,
+                          '-%(shard_num)05d-of-%(num_shards)05d.foo'))]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_empty_write(self):
     temp_path = tempfile.NamedTemporaryFile().name
     sink = MyFileSink(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index a0de131..b7cac3e 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -42,7 +42,8 @@ from apache_beam.pvalue import AsSingleton
 from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
-from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
+from apache_beam.transforms.display import HasDisplayData
+from apache_beam.transforms.display import DisplayDataItem
 
 
 # Encapsulates information about a bundle of a source generated when method
@@ -414,7 +415,7 @@ class RangeTracker(object):
     raise NotImplementedError
 
 
-class Sink(object):
+class Sink(HasDisplayData):
   """A resource that can be written to using the ``df.io.Write`` transform.
 
   Here ``df`` stands for Dataflow Python code imported in following manner.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index e031572..4e94f87 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -25,6 +25,7 @@ from apache_beam.io import fileio
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
+from apache_beam.transforms.display import DisplayDataItem
 
 __all__ = ['ReadFromText', 'WriteToText']
 
@@ -237,13 +238,21 @@ class ReadFromText(PTransform):
     """
 
     super(ReadFromText, self).__init__(**kwargs)
-    self._args = (file_pattern, min_bundle_size, compression_type,
-                  strip_trailing_newlines, coder)
-    self._validate = validate
+    self._file_pattern = file_pattern
+    self._min_bundle_size = min_bundle_size
+    self._compression_type = compression_type
+    self._strip_trailing_newlines = strip_trailing_newlines
+    self._coder = coder
+    self._source = _TextSource(file_pattern, min_bundle_size, compression_type,
+                               strip_trailing_newlines, coder, 
validate=validate)
 
   def apply(self, pvalue):
-    return pvalue.pipeline | Read(_TextSource(*self._args,
-                                              validate=self._validate))
+    return pvalue.pipeline | Read(self._source)
+
+  def display_data(self):
+    return {'source_dd': self._source,
+            'strip_nwln': DisplayDataItem(self._strip_trailing_newlines,
+                                          label='Strip Trailing New Lines')}
 
 
 class WriteToText(PTransform):
@@ -292,6 +301,10 @@ class WriteToText(PTransform):
 
     self._args = (file_path_prefix, file_name_suffix, append_trailing_newlines,
                   num_shards, shard_name_template, coder, compression_type)
+    self._sink = _TextSink(*self._args)
 
   def apply(self, pcoll):
-    return pcoll | Write(_TextSink(*self._args))
+    return pcoll | Write(self._sink)
+
+  def display_data(self):
+    return {'sink_dd': self._sink}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index 81d04ab..acdac47 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -25,6 +25,8 @@ import os
 import tempfile
 import unittest
 
+import hamcrest as hc
+
 import apache_beam as beam
 import apache_beam.io.source_test_utils as source_test_utils
 
@@ -41,6 +43,9 @@ from apache_beam.io.filebasedsource_test import write_data
 from apache_beam.io.filebasedsource_test import write_pattern
 from apache_beam.io.fileio import CompressionTypes
 
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -247,6 +252,15 @@ class TextSourceTest(unittest.TestCase):
         splits[0].source, splits[0].start_position, splits[0].stop_position,
         perform_multi_threaded_test=False)
 
+  def test_read_display_data(self):
+    read = ReadFromText('prefix', validate=False)
+    dd = DisplayData.create_from(read)
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('filePattern', 'prefix'),
+        DisplayDataItemMatcher('strip_nwln', True)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_dataflow_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
@@ -450,6 +464,20 @@ class TextSinkTest(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_display_data(self):
+    write = WriteToText('prefix')
+    dd = DisplayData.create_from(write)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'compression', 'auto'),
+        DisplayDataItemMatcher(
+            'shards', 0),
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}'.format('prefix',
+                          '-%(shard_num)05d-of-%(num_shards)05d'))]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_write_dataflow(self):
     pipeline = beam.Pipeline('DirectPipelineRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 013796c..c50f04d 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -295,12 +295,14 @@ class PipelineOptionsTest(unittest.TestCase):
   def test_dir(self):
     options = Breakfast()
     self.assertEquals(
-        ['from_dictionary', 'get_all_options', 'slices', 'style', 'view_as'],
-        [attr for attr in dir(options) if not attr.startswith('_')])
+        set(['from_dictionary', 'get_all_options', 'slices', 'style',
+             'view_as', 'display_data']),
+        set([attr for attr in dir(options) if not attr.startswith('_')]))
     self.assertEquals(
-        ['from_dictionary', 'get_all_options', 'style', 'view_as'],
-        [attr for attr in dir(options.view_as(Eggs))
-         if not attr.startswith('_')])
+        set(['from_dictionary', 'get_all_options', 'style', 'view_as',
+             'display_data']),
+        set([attr for attr in dir(options.view_as(Eggs))
+             if not attr.startswith('_')]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index a0604b8..22d2b3e 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -25,6 +25,7 @@ import random
 from apache_beam.transforms import core
 from apache_beam.transforms import cy_combiners
 from apache_beam.transforms import ptransform
+from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.typehints import Any
 from apache_beam.typehints import Dict
 from apache_beam.typehints import KV
@@ -282,6 +283,13 @@ class TopCombineFn(core.CombineFn):
       buffer.sort(cmp=lambda a, b: (not lt(a, b)) - (not lt(b, a)),
                   key=self._key_fn)
 
+  def display_data(self):
+    return {'n': self._n,
+            'compare': DisplayDataItem(self._compare.__name__
+                                       if hasattr(self._compare, '__name__')
+                                       else self._compare.__class__.__name__)
+                       .drop_if_none()}
+
   # The accumulator type is a tuple (threshold, buffer), where threshold
   # is the smallest element [key] that could possibly be in the top n based
   # on the elements observed so far, and buffer is a (periodically sorted)
@@ -413,6 +421,12 @@ class _TupleCombineFnBase(core.CombineFn):
 
   def __init__(self, *combiners):
     self._combiners = [core.CombineFn.maybe_from_callable(c) for c in 
combiners]
+    self._named_combiners = combiners
+
+  def display_data(self):
+    combiners = [c.__name__ if hasattr(c, '__name__') else c.__class__.__name__
+                 for c in self._named_combiners]
+    return {'combiners': str(combiners)}
 
   def create_accumulator(self):
     return [c.create_accumulator() for c in self._combiners]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 2858d0d..d28c63f 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -19,12 +19,16 @@
 
 import unittest
 
+import hamcrest as hc
+
 import apache_beam as beam
 from apache_beam.pipeline import Pipeline
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Map
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, equal_to
 
@@ -137,6 +141,65 @@ class CombineTest(unittest.TestCase):
                     [range(1000), range(100), range(1001)],
                     [1000, 999, 999, 998, 998])
 
+  def test_combine_per_key_top_display_data(self):
+    def individual_test_per_key_dd(combineFn):
+      transform = beam.CombinePerKey(combineFn)
+      dd = DisplayData.create_from(transform)
+      expected_items = [
+          DisplayDataItemMatcher('combineFn', combineFn.__class__),
+          DisplayDataItemMatcher('n', combineFn._n),
+          DisplayDataItemMatcher('compare', combineFn._compare.__name__)]
+      hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+    individual_test_per_key_dd(combine.Largest(5))
+    individual_test_per_key_dd(combine.Smallest(3))
+    individual_test_per_key_dd(combine.TopCombineFn(8))
+    individual_test_per_key_dd(combine.Largest(5))
+
+  def test_combine_sample_display_data(self):
+    def individual_test_per_key_dd(sampleFn, args, kwargs):
+      trs = [beam.CombinePerKey(sampleFn(*args, **kwargs)),
+             beam.CombineGlobally(sampleFn(*args, **kwargs))]
+      for transform in trs:
+        dd = DisplayData.create_from(transform)
+        expected_items = [
+            DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
+            DisplayDataItemMatcher('combineFn',
+                                   transform.fn.__class__)]
+        if len(args) > 0:
+          expected_items.append(
+              DisplayDataItemMatcher('args', str(args)))
+        if len(kwargs) > 0:
+          expected_items.append(
+              DisplayDataItemMatcher('kwargs', str(kwargs)))
+        hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+    individual_test_per_key_dd(combine.Sample.FixedSizePerKey,
+                               args=(5,),
+                               kwargs={})
+    individual_test_per_key_dd(combine.Sample.FixedSizeGlobally,
+                               args=(8,),
+                               kwargs={'arg':  9})
+
+  def test_combine_globally_display_data(self):
+    transform = beam.CombineGlobally(combine.Smallest(5))
+    dd = DisplayData.create_from(transform)
+    expected_items = [
+        DisplayDataItemMatcher('combineFn', combine.Smallest),
+        DisplayDataItemMatcher('n', 5),
+        DisplayDataItemMatcher('compare', 'gt')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_basic_combiners_display_data(self):
+    transform = beam.CombineGlobally(
+        combine.TupleCombineFn(max, combine.MeanCombineFn(), sum))
+    dd = DisplayData.create_from(transform)
+    expected_items = [
+        DisplayDataItemMatcher('combineFn', combine.TupleCombineFn),
+        DisplayDataItemMatcher('combiners',
+                               "['max', 'MeanCombineFn', 'sum']")]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_top_shorthands(self):
     pipeline = Pipeline('DirectPipelineRunner')
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 3189de7..ffcdd10 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -269,7 +269,7 @@ class CallableWrapperDoFn(DoFn):
     return getattr(self._fn, '_argspec_fn', self._fn)
 
 
-class CombineFn(WithTypeHints):
+class CombineFn(WithTypeHints, HasDisplayData):
   """A function object used by a Combine transform with custom processing.
 
   A CombineFn specifies how multiple values in all or part of a PCollection can
@@ -415,6 +415,9 @@ class CallableWrapperCombineFn(CombineFn):
     super(CallableWrapperCombineFn, self).__init__()
     self._fn = fn
 
+  def display_data(self):
+    return {'fn_dd': self._fn}
+
   def __repr__(self):
     return "CallableWrapperCombineFn(%s)" % self._fn
 
@@ -828,6 +831,12 @@ class CombineGlobally(PTransform):
     self.args = args
     self.kwargs = kwargs
 
+  def display_data(self):
+    return {'combineFn':
+            DisplayDataItem(self.fn.__class__, label='Combine Function'),
+            'combineFn_dd':
+            self.fn}
+
   def default_label(self):
     return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
 
@@ -914,6 +923,11 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
     A PObject holding the result of the combine operation.
   """
+  def display_data(self):
+    return {'combineFn':
+            DisplayDataItem(self.fn.__class__, label='Combine Function'),
+            'combineFn_dd':
+            self.fn}
 
   def make_fn(self, fn):
     self._fn_label = ptransform.label_from_callable(fn)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 0885f55..2212d00 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -48,6 +48,7 @@ from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.internal import util
 from apache_beam.transforms.display import HasDisplayData
+from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.typehints import getcallargs_forhints
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import validate_composite_type_param
@@ -629,6 +630,14 @@ class CallablePTransform(PTransform):
     self._args = ()
     self._kwargs = {}
 
+  def display_data(self):
+    res = {'fn': (self.fn.__name__
+                  if hasattr(self.fn, '__name__')
+                  else self.fn.__class__),
+           'args': DisplayDataItem(str(self._args)).drop_if_default('()'),
+           'kwargs': DisplayDataItem(str(self._kwargs)).drop_if_default('{}')}
+    return res
+
   def __call__(self, *args, **kwargs):
     if args and args[0] is None:
       label, self._args = None, args[1:]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py 
b/sdks/python/apache_beam/utils/options.py
index f68335b..aacb186 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -22,8 +22,10 @@ TODO(silviuc): Should rename this module to pipeline_options.
 
 import argparse
 
+from apache_beam.transforms.display import HasDisplayData
 
-class PipelineOptions(object):
+
+class PipelineOptions(HasDisplayData):
   """Pipeline options class used as container for command line options.
 
   The class is essentially a wrapper over the standard argparse Python module
@@ -104,12 +106,16 @@ class PipelineOptions(object):
 
     return cls(flags)
 
-  def get_all_options(self):
+  def get_all_options(self, drop_default=False):
     """Returns a dictionary of all defined arguments.
 
     Returns a dictionary of all defined arguments (arguments that are defined 
in
     any subclass of PipelineOptions) into a dictionary.
 
+    Args:
+      drop_default: If set to true, options that are equal to their default
+        values, are not returned as part of the result dictionary.
+
     Returns:
       Dictionary of all args and values.
     """
@@ -120,12 +126,17 @@ class PipelineOptions(object):
     result = vars(known_args)
 
     # Apply the overrides if any
-    for k in result:
+    for k in result.keys():
       if k in self._all_options:
         result[k] = self._all_options[k]
+      if drop_default and parser.get_default(k) == result[k]:
+        del result[k]
 
     return result
 
+  def display_data(self):
+    return self.get_all_options(True)
+
   def view_as(self, cls):
     view = cls(self._flags)
     view._all_options = self._all_options

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py 
b/sdks/python/apache_beam/utils/pipeline_options_test.py
index 3b70e1e..ed55362 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -20,6 +20,9 @@
 import logging
 import unittest
 
+import hamcrest as hc
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.options import PipelineOptions
 
 
@@ -27,25 +30,48 @@ class PipelineOptionsTest(unittest.TestCase):
 
   TEST_CASES = [
       {'flags': ['--num_workers', '5'],
-       'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': 
None}},
+       'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None},
+       'display_data': [DisplayDataItemMatcher('num_workers', 5)]},
       {
           'flags': [
               '--profile_cpu', '--profile_location', 'gs://bucket/', 
'ignored'],
           'expected': {
               'profile_cpu': True, 'profile_location': 'gs://bucket/',
-              'mock_flag': False, 'mock_option': None}
+              'mock_flag': False, 'mock_option': None},
+          'display_data': [
+              DisplayDataItemMatcher('profile_cpu',
+                                     True),
+              DisplayDataItemMatcher('profile_location',
+                                     'gs://bucket/')]
       },
       {'flags': ['--num_workers', '5', '--mock_flag'],
-       'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}},
+       'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None},
+       'display_data': [
+           DisplayDataItemMatcher('num_workers', 5),
+           DisplayDataItemMatcher('mock_flag', True)]
+      },
       {'flags': ['--mock_option', 'abc'],
-       'expected': {'mock_flag': False, 'mock_option': 'abc'}},
+       'expected': {'mock_flag': False, 'mock_option': 'abc'},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', 'abc')]
+      },
       {'flags': ['--mock_option', ' abc def '],
-       'expected': {'mock_flag': False, 'mock_option': ' abc def '}},
+       'expected': {'mock_flag': False, 'mock_option': ' abc def '},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', ' abc def ')]
+      },
       {'flags': ['--mock_option= abc xyz '],
-       'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}},
+       'expected': {'mock_flag': False, 'mock_option': ' abc xyz '},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', ' abc xyz ')]
+      },
       {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
        'expected': {'mock_flag': False,
-                    'mock_option': 'gs://my bucket/my folder/my file'}},
+                    'mock_option': 'gs://my bucket/my folder/my file'},
+       'display_data': [
+           DisplayDataItemMatcher(
+               'mock_option', 'gs://my bucket/my folder/my file')]
+      },
   ]
 
   # Used for testing newly added flags.
@@ -57,6 +83,12 @@ class PipelineOptionsTest(unittest.TestCase):
       parser.add_argument('--mock_option', help='mock option')
       parser.add_argument('--option with space', help='mock option with space')
 
+  def test_display_data(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      dd = DisplayData.create_from(options)
+      hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data']))
+
   def test_get_all_options(self):
     for case in PipelineOptionsTest.TEST_CASES:
       options = PipelineOptions(flags=case['flags'])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 4010b06..c7b940d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -95,6 +95,9 @@ REQUIRED_PACKAGES = [
     'python-gflags>=2.0,<4.0.0',
     'pyyaml>=3.10,<4.0.0',
     ]
+REQUIRED_TEST_PACKAGES = [
+    'pyhamcrest>=1.9,<2.0',
+    ]
 
 
 REQUIRED_TEST_PACKAGES = [


Reply via email to