Repository: beam Updated Branches: refs/heads/python-sdk f22fb9ce5 -> a779ac15d
Updates Python SDK examples to use Beam text source/sink. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87bed83b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87bed83b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87bed83b Branch: refs/heads/python-sdk Commit: 87bed83bc96e73003f05ed299e08b558ac5d1e37 Parents: f22fb9c Author: Chamikara Jayalath <[email protected]> Authored: Mon Jan 2 23:21:05 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Tue Jan 3 10:42:02 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/complete/autocomplete.py | 6 ++++-- .../apache_beam/examples/complete/estimate_pi.py | 4 ++-- .../examples/complete/juliaset/juliaset/juliaset.py | 3 ++- sdks/python/apache_beam/examples/complete/tfidf.py | 6 ++++-- .../examples/complete/top_wikipedia_sessions.py | 6 ++++-- .../examples/cookbook/bigquery_side_input.py | 3 ++- .../apache_beam/examples/cookbook/bigshuffle.py | 16 ++++++++-------- sdks/python/apache_beam/examples/cookbook/coders.py | 8 ++++---- .../examples/cookbook/custom_ptransform.py | 11 ++++++----- .../examples/cookbook/datastore_wordcount.py | 3 ++- .../examples/cookbook/group_with_coder.py | 6 ++++-- .../apache_beam/examples/cookbook/mergecontacts.py | 16 +++++++--------- .../examples/cookbook/multiple_output_pardo.py | 14 +++++++------- sdks/python/apache_beam/examples/wordcount.py | 6 ++++-- .../apache_beam/examples/wordcount_debugging.py | 6 ++++-- .../apache_beam/examples/wordcount_minimal.py | 6 ++++-- .../consumer_tracking_pipeline_visitor_test.py | 8 ++++++-- 17 files changed, 74 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index 87e6c0c..3f2a7ae 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -24,6 +24,8 @@ import logging import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -45,12 +47,12 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) + | 'read' >> ReadFromText(known_args.input) | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | 'TopPerPrefix' >> TopPerPrefix(5) | 'format' >> beam.Map( lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) - | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) + | 'write' >> WriteToText(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index d0faefe..11081a6 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -33,6 +33,7 @@ import random import apache_beam as beam +from apache_beam.io import WriteToText from apache_beam.typehints import Any from apache_beam.typehints import Iterable from apache_beam.typehints import Tuple @@ -113,8 +114,7 @@ def run(argv=None): (p # pylint: disable=expression-not-assigned | EstimatePiTransform() - | beam.io.Write(beam.io.TextFileSink(known_args.output, - coder=JsonCoder()))) + | WriteToText(known_args.output, coder=JsonCoder())) # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 1445fbe..45fc1fb 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -25,6 +25,7 @@ from __future__ import absolute_import import argparse import apache_beam as beam +from apache_beam.io import WriteToText def from_pixel(x, y, n): @@ -110,7 +111,7 @@ def run(argv=None): # pylint: disable=missing-docstring | 'x coord' >> beam.GroupByKey() | 'format' >> beam.Map( lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) + | WriteToText(known_args.coordinate_output)) # pylint: enable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index b4d5b45..59b9d6f 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -29,6 +29,8 @@ import math import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.pvalue import AsSingleton from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -40,7 +42,7 @@ def read_documents(pipeline, uris): for uri in uris: pcolls.append( pipeline - | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri)) + | 'read: %s' % uri >> ReadFromText(uri) | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri)) return pcolls | 'flatten read pcolls' >> beam.Flatten() @@ -197,7 +199,7 @@ def run(argv=None): output = pcoll | TfIdf() # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) + output | 'write' >> WriteToText(known_args.output) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index cbd305a..2dea642 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -46,6 +46,8 @@ import logging import apache_beam as beam from apache_beam import combiners from apache_beam import window +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -168,9 +170,9 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | beam.Read(beam.io.TextFileSource(known_args.input)) + | ReadFromText(known_args.input) | ComputeTopSessions(known_args.sampling_threshold) - | beam.io.Write(beam.io.TextFileSink(known_args.output))) + | WriteToText(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 25e2c3b..7c5784b 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -33,6 +33,7 @@ from random import randrange import apache_beam as beam +from apache_beam.io import WriteToText from apache_beam.pvalue import AsList from apache_beam.pvalue import AsSingleton from apache_beam.utils.pipeline_options import PipelineOptions @@ -113,7 +114,7 @@ def run(argv=None): pcoll_ignore_corpus, pcoll_ignore_word) # pylint:disable=expression-not-assigned - pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output)) + pcoll_groups | WriteToText(known_args.output) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index 83d3881..ceeefd6 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -25,6 +25,8 @@ import logging import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -53,9 +55,7 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read( - beam.io.TextFileSource(known_args.input, - coder=beam.coders.BytesCoder())) + lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder()) # Count the occurrences of each word. output = (lines @@ -68,7 +68,7 @@ def run(argv=None): lambda (key, vals): ['%s%s' % (key, val) for val in vals])) # Write the output using a "Write" transform that has side effects. - output | beam.io.Write(beam.io.TextFileSink(known_args.output)) + output | WriteToText(known_args.output) # Optionally write the input and output checksums. if known_args.checksum_output: @@ -76,15 +76,15 @@ def run(argv=None): | 'input-csum' >> beam.Map(crc32line) | 'combine-input-csum' >> beam.CombineGlobally(sum) | 'hex-format' >> beam.Map(lambda x: '%x' % x)) - input_csum | 'write-input-csum' >> beam.io.Write( - beam.io.TextFileSink(known_args.checksum_output + '-input')) + input_csum | 'write-input-csum' >> WriteToText( + known_args.checksum_output + '-input') output_csum = (output | 'output-csum' >> beam.Map(crc32line) | 'combine-output-csum' >> beam.CombineGlobally(sum) | 'hex-format-output' >> beam.Map(lambda x: '%x' % x)) - output_csum | 'write-output-csum' >> beam.io.Write( - beam.io.TextFileSink(known_args.checksum_output + '-output')) + output_csum | 'write-output-csum' >> WriteToText( + known_args.checksum_output + '-output') # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py index 690ba66..ede9d70 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders.py +++ b/sdks/python/apache_beam/examples/cookbook/coders.py @@ -35,6 +35,8 @@ import json import logging import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -87,12 +89,10 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) (p # pylint: disable=expression-not-assigned - | beam.io.Read('read', - beam.io.TextFileSource(known_args.input, coder=JsonCoder())) + | 'read' >> ReadFromText(known_args.input, coder=JsonCoder()) | 'points' >> beam.FlatMap(compute_points) | beam.CombinePerKey(sum) - | beam.io.Write('write', - beam.io.TextFileSink(known_args.output, coder=JsonCoder()))) + | 'write' >> WriteToText(known_args.output, coder=JsonCoder())) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index 56259ed..ef6bc5a 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -26,7 +26,8 @@ import argparse import logging import apache_beam as beam - +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions @@ -66,9 +67,9 @@ def run_count2(known_args, options): """Runs the second example pipeline.""" logging.info('Running second pipeline') p = beam.Pipeline(options=options) - (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) + (p | ReadFromText(known_args.input) | Count2() # pylint: disable=no-value-for-parameter - | beam.io.Write(beam.io.TextFileSink(known_args.output))) + | WriteToText(known_args.output)) p.run() @@ -93,9 +94,9 @@ def run_count3(known_args, options): """Runs the third example pipeline.""" logging.info('Running third pipeline') p = beam.Pipeline(options=options) - (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) + (p | ReadFromText(known_args.input) | Count3(2) # pylint: disable=no-value-for-parameter - | beam.io.Write(beam.io.TextFileSink(known_args.output))) + | WriteToText(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index dd34070..8f68fb4 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -71,6 +71,7 @@ from google.datastore.v1 import query_pb2 from googledatastore import helper as datastore_helper, PropertyFilter import apache_beam as beam +from apache_beam.io import ReadFromText from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore from apache_beam.utils.pipeline_options import GoogleCloudOptions @@ -133,7 +134,7 @@ def write_to_datastore(project, user_options, pipeline_options): # pylint: disable=expression-not-assigned (p - | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input)) + | 'read' >> ReadFromText(user_options.input) | 'create entity' >> beam.Map( EntityWrapper(user_options.namespace, user_options.kind, user_options.ancestor).make_entity) http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index c4b8c59..78540d1 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -33,6 +33,8 @@ import sys import apache_beam as beam from apache_beam import coders +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.typehints import typehints from apache_beam.typehints.decorators import with_output_types from apache_beam.utils.pipeline_options import PipelineOptions @@ -98,7 +100,7 @@ def run(argv=sys.argv[1:]): coders.registry.register_coder(Player, PlayerCoder) (p # pylint: disable=expression-not-assigned - | beam.io.Read(beam.io.TextFileSource(known_args.input)) + | ReadFromText(known_args.input) # The get_players function is annotated with a type hint above, so the type # system knows the output type of the following operation is a key-value pair # of a Player and an int. Please see the documentation for details on @@ -111,7 +113,7 @@ def run(argv=sys.argv[1:]): # encode Player objects as keys for this combine operation. | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) - | beam.io.Write(beam.io.TextFileSink(known_args.output))) + | WriteToText(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 6602609..2475e02 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -36,6 +36,8 @@ import logging import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -72,7 +74,7 @@ def run(argv=None, assert_results=None): # quotes/backslashes, and convert it a PCollection of (key, value) pairs. def read_kv_textfile(label, textfile): return (p - | beam.io.Read('read_%s' % label, textfile) + | 'read_%s' % label >> ReadFromText(textfile) | beam.Map('backslash_%s' % label, lambda x: re.sub(r'\\', r'\\\\', x)) | beam.Map('escape_quotes_%s' % label, @@ -80,12 +82,9 @@ def run(argv=None, assert_results=None): | beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1))) # Read input databases. - email = read_kv_textfile('email', - beam.io.TextFileSource(known_args.input_email)) - phone = read_kv_textfile('phone', - beam.io.TextFileSource(known_args.input_phone)) - snailmail = read_kv_textfile('snailmail', beam.io.TextFileSource( - known_args.input_snailmail)) + email = read_kv_textfile('email', known_args.input_email) + phone = read_kv_textfile('phone', known_args.input_phone) + snailmail = read_kv_textfile('snailmail', known_args.input_snailmail) # Group together all entries under the same name. grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() @@ -113,8 +112,7 @@ def run(argv=None, assert_results=None): # Write tab-delimited output. # pylint: disable=expression-not-assigned - tsv_lines | beam.io.Write('write_tsv', - beam.io.TextFileSink(known_args.output_tsv)) + tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv) # TODO(silviuc): Move the assert_results logic to the unit test. if assert_results is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index dd91e74..3acebc6 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -54,6 +54,8 @@ import re import apache_beam as beam from apache_beam import pvalue +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -137,7 +139,7 @@ def run(argv=None): pipeline_options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=pipeline_options) - lines = p | beam.Read(beam.io.TextFileSource(known_args.input)) + lines = p | ReadFromText(known_args.input) # with_outputs allows accessing the side outputs of a DoFn. split_lines_result = (lines @@ -158,20 +160,18 @@ def run(argv=None): | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) - | 'write chars' >> beam.Write( - beam.io.TextFileSink(known_args.output + '-chars'))) + | 'write chars' >> WriteToText(known_args.output + '-chars')) # pylint: disable=expression-not-assigned (short_words | 'count short words' >> CountWords() - | 'write short words' >> beam.Write( - beam.io.TextFileSink(known_args.output + '-short-words'))) + | 'write short words' >> WriteToText( + known_args.output + '-short-words')) # pylint: disable=expression-not-assigned (words | 'count words' >> CountWords() - | 'write words' >> beam.Write( - beam.io.TextFileSink(known_args.output + '-words'))) + | 'write words' >> WriteToText(known_args.output + '-words')) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 51fc2eb..211211d 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -24,6 +24,8 @@ import logging import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -76,7 +78,7 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) + lines = p | 'read' >> ReadFromText(known_args.input) # Count the occurrences of each word. counts = (lines @@ -91,7 +93,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) + output | 'write' >> WriteToText(known_args.output) # Actually run the pipeline (all operations above are deferred). result = p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index bba09b4..14f379d 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -46,6 +46,8 @@ import logging import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -126,7 +128,7 @@ def run(argv=None): # Read the text file[pattern] into a PCollection, count the occurrences of # each word and filter by a list of words. filtered_words = ( - p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) + p | 'read' >> ReadFromText(known_args.input) | CountWords() | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) @@ -146,7 +148,7 @@ def run(argv=None): # pylint: disable=unused-variable output = (filtered_words | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) + | 'write' >> WriteToText(known_args.output)) # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index c02ec16..18595d0 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -51,6 +51,8 @@ import logging import re import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -93,7 +95,7 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) + lines = p | 'read' >> ReadFromText(known_args.input) # Count the occurrences of each word. counts = (lines @@ -108,7 +110,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) + output | 'write' >> WriteToText(known_args.output) # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 6fc9d83..27cf1b6 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -21,8 +21,8 @@ import logging import unittest from apache_beam import pvalue +from apache_beam.io import iobase from apache_beam.io import Read -from apache_beam.io import TextFileSource from apache_beam.pipeline import Pipeline from apache_beam.pvalue import AsList from apache_beam.runners.direct import DirectRunner @@ -47,7 +47,11 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def test_root_transforms(self): root_create = Create('create', [[1, 2, 3]]) - root_read = Read('read', TextFileSource('/tmp/somefile')) + + class DummySource(iobase.BoundedSource): + pass + + root_read = Read('read', DummySource()) root_flatten = Flatten('flatten', pipeline=self.pipeline) pbegin = pvalue.PBegin(self.pipeline)
