Repository: beam Updated Branches: refs/heads/master fd40d4b29 -> 223dbb449
Add a cloud-pubsub dependency to the list of gcp extra packages Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef19024d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef19024d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef19024d Branch: refs/heads/master Commit: ef19024d2e9dc046c6699aeee1edc483beb9a009 Parents: fd40d4b Author: Ahmet Altay <al...@google.com> Authored: Tue Jun 20 14:25:55 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue Jun 20 16:43:16 2017 -0700 ---------------------------------------------------------------------- .../python/apache_beam/examples/streaming_wordcount.py | 13 ++++++++----- sdks/python/setup.py | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ef19024d/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index ed8b5d0..f2b179a 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -25,16 +25,19 @@ from __future__ import absolute_import import argparse import logging -import re import apache_beam as beam import apache_beam.transforms.window as window +def split_fn(lines): + import re + return re.findall(r'[A-Za-z\']+', x) + + def run(argv=None): """Build and run the pipeline.""" - parser = argparse.ArgumentParser() parser.add_argument( '--input_topic', required=True, @@ -46,14 +49,14 @@ def run(argv=None): with beam.Pipeline(argv=pipeline_args) as p: - # Read the text file[pattern] into a PCollection. + # Read from PubSub into a PCollection. lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. transformed = (lines + # Use a pre-defined function that imports the re package. | 'Split' >> ( - beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) + beam.FlatMap(split_fn).with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) | 'Group' >> beam.GroupByKey() http://git-wip-us.apache.org/repos/asf/beam/blob/ef19024d/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 051043b..584c852 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -118,6 +118,7 @@ GCP_REQUIREMENTS = [ 'google-apitools>=0.5.10,<=0.5.11', 'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4', 'googledatastore==7.0.1', + 'google-cloud-pubsub==0.25.0', # GCP packages required by tests 'google-cloud-bigquery>=0.23.0,<0.25.0', ]