Repository: beam Updated Branches: refs/heads/master 42e3a6f85 -> 884935cb9
[BEAM-1695] Improve Python-SDK's programming guide Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9174ebf4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9174ebf4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9174ebf4 Branch: refs/heads/master Commit: 9174ebf48391f393761b2ce085d2f165ebb2c7bb Parents: 42e3a6f Author: Tibor Kiss <[email protected]> Authored: Thu Apr 27 14:12:13 2017 +0200 Committer: Ahmet Altay <[email protected]> Committed: Thu Apr 27 12:03:20 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 24 ++++++++++++-------- .../examples/snippets/snippets_test.py | 6 +++-- 2 files changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9174ebf4/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index c566914..1b750b4 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -74,6 +74,11 @@ def construct_pipeline(renames): """A reverse words snippet as an example for constructing a pipeline.""" import re + # This is duplicate of the import statement in + # pipelines_constructing_creating tag below, but required to avoid + # Unresolved reference in ReverseWords class + import apache_beam as beam + class ReverseWords(beam.PTransform): """A PTransform that reverses individual elements in a PCollection.""" @@ -85,6 +90,7 @@ def construct_pipeline(renames): return True # [START pipelines_constructing_creating] + import apache_beam as beam from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) @@ -172,16 +178,18 @@ def model_pcollection(argv): # [START model_pcollection] p = beam.Pipeline(options=pipeline_options) - (p - | beam.Create([ - 'To be, or not to be: that is the question: ', - 'Whether \'tis nobler in the mind to suffer ', - 'The slings and arrows of outrageous fortune, ', - 'Or to take arms against a sea of troubles, ']) + lines = (p + | beam.Create([ + 'To be, or not to be: that is the question: ', + 'Whether \'tis nobler in the mind to suffer ', + 'The slings and arrows of outrageous fortune, ', + 'Or to take arms against a sea of troubles, '])) + # [END model_pcollection] + + (lines | beam.io.WriteToText(my_options.output)) result = p.run() - # [END model_pcollection] result.wait_until_finish() @@ -1006,9 +1014,7 @@ def model_multiple_pcollections_flatten(contents, output_path): # types.) # [START model_multiple_pcollections_flatten] merged = ( - # [START model_multiple_pcollections_tuple] (pcoll1, pcoll2, pcoll3) - # [END model_multiple_pcollections_tuple] # A list of tuples can be "piped" directly into a Flatten transform. | beam.Flatten()) # [END model_multiple_pcollections_flatten] http://git-wip-us.apache.org/repos/asf/beam/blob/9174ebf4/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 2aee350..b8054ad 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -767,7 +767,7 @@ class CombineTest(unittest.TestCase): def test_custom_average(self): pc = [2, 3, 5, 7] - # [START combine_custom_average] + # [START combine_custom_average_define] class AverageFn(beam.CombineFn): def create_accumulator(self): return (0.0, 0) @@ -781,8 +781,10 @@ class CombineTest(unittest.TestCase): def extract_output(self, (sum, count)): return sum / count if count else float('NaN') + # [END combine_custom_average_define] + # [START combine_custom_average_execute] average = pc | beam.CombineGlobally(AverageFn()) - # [END combine_custom_average] + # [END combine_custom_average_execute] self.assertEqual([4.25], average) def test_keys(self):
