davidcavazos commented on a change in pull request #14738:
URL: https://github.com/apache/beam/pull/14738#discussion_r631291737
##########
File path: sdks/python/apache_beam/examples/snippets/snippets.py
##########
@@ -110,80 +115,71 @@ def filter_words(unused_x):
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
- with beam.Pipeline(options=PipelineOptions()) as p:
+ beam_options = PipelineOptions()
+ with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here
# [END pipelines_constructing_creating]
- with TestPipeline() as p: # Use TestPipeline for testing.
- # pylint: disable=line-too-long
+ # [START pipelines_constructing_reading]
+ lines = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(
+ 'gs://some/inputData.txt')
+ # [END pipelines_constructing_reading]
- # [START pipelines_constructing_reading]
- lines = p | 'ReadMyFile' >> beam.io.ReadFromText(
- 'gs://some/inputData.txt')
- # [END pipelines_constructing_reading]
+ # [START pipelines_constructing_applying]
+ words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ reversed_words = words | ReverseWords()
+ # [END pipelines_constructing_applying]
- # [START pipelines_constructing_applying]
- words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- reversed_words = words | ReverseWords()
- # [END pipelines_constructing_applying]
+ # [START pipelines_constructing_writing]
+ filtered_words = reversed_words | 'FilterWords' >>
beam.Filter(filter_words)
+ filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
+ 'gs://some/outputData.txt')
+ # [END pipelines_constructing_writing]
- # [START pipelines_constructing_writing]
- filtered_words = reversed_words | 'FilterWords' >> beam.Filter(
- filter_words)
- filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
- 'gs://some/outputData.txt')
- # [END pipelines_constructing_writing]
+ pipeline.visit(SnippetUtils.RenameFiles(renames))
- p.visit(SnippetUtils.RenameFiles(renames))
-
-def model_pipelines(argv):
+def model_pipelines():
"""A wordcount snippet as a simple pipeline example."""
# [START model_pipelines]
+ import argparse
import re
+ import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
- class MyOptions(PipelineOptions):
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--input',
- dest='input',
- default='gs://dataflow-samples/shakespeare/kinglear'
- '.txt',
- help='Input file to process.')
- parser.add_argument(
- '--output',
- dest='output',
- required=True,
- help='Output file to write results to.')
-
- pipeline_options = PipelineOptions(argv)
- my_options = pipeline_options.view_as(MyOptions)
-
- with beam.Pipeline(options=pipeline_options) as p:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument(
+ '--output', required=True, help='Output file to write results to.')
+ args, beam_args = parser.parse_known_args(sys.argv)
+ beam_options = PipelineOptions(beam_args)
+ with beam.Pipeline(options=beam_options) as pipeline:
(
- p
- | beam.io.ReadFromText(my_options.input)
+ pipeline
+ | beam.io.ReadFromText(args.input)
| beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.Map(lambda x: (x, 1))
| beam.combiners.Count.PerKey()
- | beam.io.WriteToText(my_options.output))
+ | beam.io.WriteToText(args.output))
# [END model_pipelines]
-def model_pcollection(argv):
+def model_pcollection(output_path):
"""Creating a PCollection from data in local memory."""
# [START model_pcollection]
+ import sys
+
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
- # argv = None # if None, uses sys.argv
- pipeline_options = PipelineOptions(argv)
- with beam.Pipeline(options=pipeline_options) as pipeline:
+ beam_options = PipelineOptions(sys.argv)
Review comment:
Thanks, it looks like I needed to add a first `unused` argument to the
test_argv.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]