Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk c8cef2cba -> a1a51c3c1


Update some of the example tests to use assert_that


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fef464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fef464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fef464

Branch: refs/heads/python-sdk
Commit: 84fef464b0abea41e318c0fe983ac43874e5f6ad
Parents: c8cef2c
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 13 16:28:46 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jul 14 17:37:02 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete_test.py      | 34 ++-------
 .../examples/complete/estimate_pi.py            | 19 +++--
 .../examples/complete/estimate_pi_test.py       | 36 +++++-----
 .../examples/cookbook/coders_test.py            | 33 +++------
 .../examples/cookbook/custom_ptransform.py      | 74 ++++++++++----------
 .../examples/cookbook/custom_ptransform_test.py | 41 ++++-------
 .../examples/cookbook/group_with_coder_test.py  | 70 ++++++++----------
 sdks/python/apache_beam/transforms/util.py      | 13 ++++
 8 files changed, 139 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py 
b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index bd0a6cb..1b3ee5f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -15,43 +15,17 @@
 # limitations under the License.
 #
 
-"""Test for the wordcount example."""
+"""Test for the autocomplete example."""
 
-import collections
 import unittest
 
-
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
-from apache_beam.pvalue import AsIter
-
-# TODO(robertwb): Move to testing utilities.
-
-
-def assert_that(pcoll, matcher):
-  """Asserts that the give PCollection satisfies the constraints of the matcher
-  in a way that is runnable locally or on a remote service.
-  """
-  singleton = pcoll.pipeline | beam.Create('create_singleton', [None])
-
-  def check_matcher(_, side_value):
-    assert matcher(side_value)
-    return []
-  singleton | beam.FlatMap(check_matcher, AsIter(pcoll))  # pylint: 
disable=expression-not-assigned
-
-
-def contains_in_any_order(expected):
-  def matcher(value):
-    vs = collections.Counter(value)
-    es = collections.Counter(expected)
-    if vs != es:
-      raise ValueError(
-          'extra: %s, missing: %s' % (vs - es, es - vs))
-    return True
-  return matcher
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import contains_in_any_order
 
 
-class WordCountTest(unittest.TestCase):
+class AutocompleteTest(unittest.TestCase):
 
   WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 8b0f202..3c4a2d9 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -85,6 +85,20 @@ class JsonCoder(object):
     return json.dumps(x)
 
 
+class EstimatePiTransform(beam.PTransform):
+  """Runs 10M trials, and combine the results to estimate pi."""
+
+  def __init__(self, label):
+    super(EstimatePiTransform, self).__init__(label)
+
+  def apply(self, pcoll):
+    # A hundred work items of a hundred thousand tries each.
+    return (pcoll
+            | beam.Create('Initialize', [100000] * 100).with_output_types(int)
+            | beam.Map('Run trials', run_trials)
+            | beam.CombineGlobally('Sum', combine_results).without_defaults())
+
+
 def run(argv=None):
 
   parser = argparse.ArgumentParser()
@@ -94,11 +108,8 @@ def run(argv=None):
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   p = beam.Pipeline(argv=pipeline_args)
-  # A thousand work items of a million tries each.
   (p  # pylint: disable=expression-not-assigned
-   | beam.Create('Initialize', [100000] * 100).with_output_types(int)
-   | beam.Map('Run trials', run_trials)
-   | beam.CombineGlobally('Sum', combine_results).without_defaults()
+   | EstimatePiTransform('Estimate')
    | beam.io.Write('Write',
                    beam.io.TextFileSink(known_args.output,
                                         coder=JsonCoder())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index ebebadc..7ca82d7 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -17,31 +17,33 @@
 
 """Test for the estimate_pi example."""
 
-import json
-import logging
-import tempfile
 import unittest
 
+import apache_beam as beam
 from apache_beam.examples.complete import estimate_pi
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import DataflowAssertException
 
 
-class EstimatePiTest(unittest.TestCase):
+def in_between(lower, upper):
+  def _in_between(actual):
+    _, _, estimate = actual[0]
+    if estimate < lower or estimate > upper:
+      raise DataflowAssertException(
+          'Failed assert: %f not in [%f, %f]' % (estimate, lower, upper))
+  return _in_between
+
 
-  def create_temp_file(self, contents):
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
-      return f.name
+class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
-    temp_path = self.create_temp_file('result')
-    estimate_pi.run([
-        '--output=%s' % temp_path])
-    # Parse result file and compare.
-    with open(temp_path + '-00000-of-00001') as result_file:
-      estimated_pi = json.loads(result_file.readline())[2]
-      # Note: Probabilistically speaking this test can fail with a probability
-      # that is very small (VERY) given that we run at least 10 million trials.
-      self.assertTrue(estimated_pi > 3.13 and estimated_pi < 3.15)
+    p = beam.Pipeline('DirectPipelineRunner')
+    result = p | estimate_pi.EstimatePiTransform('Estimate')
+
+    # Note: Probabilistically speaking this test can fail with a probability
+    # that is very small (VERY) given that we run at least 10 million trials.
+    assert_that(result, in_between(3.13, 3.15))
+    p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py 
b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 904a967..5840081 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -17,12 +17,13 @@
 
 """Test for the coders example."""
 
-import json
 import logging
-import tempfile
 import unittest
 
+import apache_beam as beam
 from apache_beam.examples.cookbook import coders
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
 
 
 class CodersTest(unittest.TestCase):
@@ -32,26 +33,14 @@ class CodersTest(unittest.TestCase):
       {'host': ['Germany', 1], 'guest': ['Brasil', 3]},
       {'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
 
-  def create_temp_file(self, records):
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      for record in records:
-        f.write('%s\n' % json.dumps(record))
-      return f.name
-
-  def test_basics(self):
-    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
-    coders.run([
-        '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path])
-    # Parse result file and compare.
-    results = []
-    with open(temp_path + '.result-00000-of-00001') as result_file:
-      for line in result_file:
-        results.append(json.loads(line))
-      logging.info('result: %s', results)
-    self.assertEqual(
-        sorted(results),
-        sorted([['Italy', 0], ['Brasil', 6], ['Germany', 3]]))
+  def test_compute_points(self):
+    p = beam.Pipeline('DirectPipelineRunner')
+    records = p | beam.Create('create', self.SAMPLE_RECORDS)
+    result = (records
+              | beam.FlatMap('points', coders.compute_points)
+              | beam.CombinePerKey(sum))
+    assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 
3)]))
+    p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py 
b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 8da1f43..d3d8b08 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -33,69 +33,67 @@ from apache_beam.utils.options import PipelineOptions
 # pylint doesn't understand our pipeline syntax:
 # pylint:disable=expression-not-assigned
 
+class Count1(beam.PTransform):
+  """Count as a subclass of PTransform, with an apply method."""
 
-def run_count1(known_args, options):
-  """Runs the first example pipeline."""
-
-  class Count(beam.PTransform):
-    """Count as a subclass of PTransform, with an apply method."""
+  def apply(self, pcoll):
+    return (
+        pcoll
+        | beam.Map('Init', lambda v: (v, 1))
+        | beam.CombinePerKey(sum))
 
-    def apply(self, pcoll):
-      return (
-          pcoll
-          | beam.Map('Init', lambda v: (v, 1))
-          | beam.CombinePerKey(sum))
 
+def run_count1(known_args, options):
+  """Runs the first example pipeline."""
   logging.info('Running first pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count()
+  (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1()
    | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 
-def run_count2(known_args, options):
-  """Runs the second example pipeline."""
+@beam.ptransform_fn
+def Count2(pcoll):  # pylint: disable=invalid-name
+  """Count as a decorated function."""
+  return (
+      pcoll
+      | beam.Map('Init', lambda v: (v, 1))
+      | beam.CombinePerKey(sum))
 
-  @beam.ptransform_fn
-  def Count(pcoll):      # pylint: disable=invalid-name
-    """Count as a decorated function."""
-    return (
-        pcoll
-        | beam.Map('Init', lambda v: (v, 1))
-        | beam.CombinePerKey(sum))
 
+def run_count2(known_args, options):
+  """Runs the second example pipeline."""
   logging.info('Running second pipeline')
   p = beam.Pipeline(options=options)
   (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
-   | Count()  # pylint: disable=no-value-for-parameter
+   | Count2()  # pylint: disable=no-value-for-parameter
    | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 
-def run_count3(known_args, options):
-  """Runs the third example pipeline."""
+@beam.ptransform_fn
+def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
+  """Count as a decorated function with a side input.
 
-  @beam.ptransform_fn
-  # pylint: disable=invalid-name
-  def Count(pcoll, factor=1):
-    """Count as a decorated function with a side input.
+  Args:
+    pcoll: the PCollection passed in from the previous transform
+    factor: the amount by which to count
 
-    Args:
-      pcoll: the PCollection passed in from the previous transform
-      factor: the amount by which to count
+  Returns:
+    A PCollection counting the number of times each unique element occurs.
+  """
+  return (
+      pcoll
+      | beam.Map('Init', lambda v: (v, factor))
+      | beam.CombinePerKey(sum))
 
-    Returns:
-      A PCollection counting the number of times each unique element occurs.
-    """
-    return (
-        pcoll
-        | beam.Map('Init', lambda v: (v, factor))
-        | beam.CombinePerKey(sum))
 
+def run_count3(known_args, options):
+  """Runs the third example pipeline."""
   logging.info('Running third pipeline')
   p = beam.Pipeline(options=options)
   (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
-   | Count(2)  # pylint: disable=no-value-for-parameter
+   | Count3(2)  # pylint: disable=no-value-for-parameter
    | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py 
b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 875a99f..3c0c6f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -18,48 +18,33 @@
 """Tests for the various custom Count implementation examples."""
 
 import logging
-import tempfile
 import unittest
 
+import apache_beam as beam
 from apache_beam.examples.cookbook import custom_ptransform
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
 
 
 class CustomCountTest(unittest.TestCase):
 
   def test_count1(self):
-    self.run_pipeline(custom_ptransform.run_count1)
+    self.run_pipeline(custom_ptransform.Count1())
 
   def test_count2(self):
-    self.run_pipeline(custom_ptransform.run_count2)
+    self.run_pipeline(custom_ptransform.Count2())
 
   def test_count3(self):
-    self.run_pipeline(custom_ptransform.run_count3, factor=2)
+    factor = 2
+    self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
 
   def run_pipeline(self, count_implementation, factor=1):
-    input_path = self.create_temp_file('CAT\nDOG\nCAT\nCAT\nDOG\n')
-    output_path = input_path + '.result'
-
-    known_args, pipeline_args = custom_ptransform.get_args([
-        '--input=%s*' % input_path, '--output=%s' % output_path
-    ])
-
-    count_implementation(known_args, PipelineOptions(pipeline_args))
-    self.assertEqual(["(u'CAT', %d)" % (3 * factor),
-                      "(u'DOG', %d)" % (2 * factor)],
-                     self.get_output(output_path + '-00000-of-00001'))
-
-  def create_temp_file(self, contents=''):
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
-      return f.name
-
-  def get_output(self, path):
-    logging.info('Reading output from "%s"', path)
-    lines = []
-    with open(path) as f:
-      lines = f.readlines()
-    return sorted(s.rstrip('\n') for s in lines)
+    p = beam.Pipeline('DirectPipelineRunner')
+    words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+    result = words | count_implementation
+    assert_that(
+        result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
+    p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py 
b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index fb52809..07211a9 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -18,10 +18,13 @@
 """Test for the custom coders example."""
 
 import logging
-import tempfile
 import unittest
 
+import apache_beam as beam
 from apache_beam.examples.cookbook import group_with_coder
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+from apache_beam.utils.options import OptionsContext
 
 
 # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was
@@ -36,54 +39,37 @@ class GroupWithCoderTest(unittest.TestCase):
       'joe,20', 'fred,6', 'ann,5',
       'joe,30', 'ann,10', 'mary,1']
 
-  def create_temp_file(self, records):
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      for record in records:
-        f.write('%s\n' % record)
-      return f.name
-
-  def test_basics_with_type_check(self):
-    # Run the workflow with --pipeline_type_check option. This will make sure
+  @OptionsContext(pipeline_type_check=True)
+  def test_basics_with_type_check_n(self):
+    # Run the workflow with pipeline_type_check option. This will make sure
     # the typehints associated with all transforms will have non-default values
     # and therefore any custom coders will be used. In our case we want to make
     # sure the coder for the Player class will be used.
-    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
-    group_with_coder.run([
-        '--pipeline_type_check',
-        '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path])
-    # Parse result file and compare.
-    results = []
-    with open(temp_path + '.result-00000-of-00001') as result_file:
-      for line in result_file:
-        name, points = line.split(',')
-        results.append((name, int(points)))
-      logging.info('result: %s', results)
-    self.assertEqual(
-        sorted(results),
-        sorted([('x:ann', 15), ('x:fred', 9), ('x:joe', 60), ('x:mary', 8)]))
+    p = beam.Pipeline('DirectPipelineRunner')
+    data = p | beam.Create('create', self.SAMPLE_RECORDS)
+    result = (data
+              | beam.Map('get players', group_with_coder.get_players)
+              | beam.CombinePerKey(sum)
+              | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)))
+    assert_that(result, equal_to(
+        ['x:ann,15', 'x:fred,9', 'x:joe,60', 'x:mary,8']))
+    p.run()
 
-  def test_basics_without_type_check(self):
-    # Run the workflow without --pipeline_type_check option. This will make 
sure
+  @OptionsContext(pipeline_type_check=False)
+  def test_basics_without_type_check_n(self):
+    # Run the workflow without pipeline_type_check option. This will make sure
     # the typehints associated with all transforms will have default values and
     # therefore any custom coders will not be used. The default coder (pickler)
     # will be used instead.
-    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
-    group_with_coder.run([
-        '--no_pipeline_type_check',
-        '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path])
-    # Parse result file and compare.
-    results = []
-    with open(temp_path + '.result-00000-of-00001') as result_file:
-      for line in result_file:
-        name, points = line.split(',')
-        results.append((name, int(points)))
-      logging.info('result: %s', results)
-    self.assertEqual(
-        sorted(results),
-        sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)]))
-
+    p = beam.Pipeline('DirectPipelineRunner')
+    data = p | beam.Create('create', self.SAMPLE_RECORDS)
+    result = (data
+              | beam.Map('get players', group_with_coder.get_players)
+              | beam.CombinePerKey(sum)
+              | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)))
+    assert_that(result, equal_to(
+        ['ann,15', 'fred,9', 'joe,60', 'mary,8']))
+    p.run()
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index b7a121d..59f4338 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,6 +20,8 @@
 
 from __future__ import absolute_import
 
+import collections
+
 from apache_beam.pvalue import AsIter as AllOf
 from apache_beam.transforms.core import CombinePerKey, Create, Flatten, 
GroupByKey, Map
 from apache_beam.transforms.ptransform import PTransform
@@ -35,6 +37,7 @@ __all__ = [
     'assert_that',
     'equal_to',
     'is_empty',
+    'contains_in_any_order',
     ]
 
 
@@ -196,6 +199,16 @@ def is_empty():
   return _empty
 
 
+def contains_in_any_order(expected):
+  def _contains_in_any_order(actual):
+    vs = collections.Counter(actual)
+    es = collections.Counter(expected)
+    if vs != es:
+      raise DataflowAssertException(
+          'Failed assert: extra: %s, missing: %s' % (vs - es, es - vs))
+  return _contains_in_any_order
+
+
 def assert_that(actual, matcher, label='assert_that'):
   """A PTransform that checks a PCollection has an expected value.
 

Reply via email to