Updates Python datastore wordcount example to take a dataset parameter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac436349 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac436349 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac436349 Branch: refs/heads/mr-runner Commit: ac4363491e6c300cf34c05bf547cee3ccc37c98e Parents: b013d7c Author: [email protected] <[email protected]> Authored: Tue Oct 31 18:37:29 2017 -0700 Committer: [email protected] <[email protected]> Committed: Wed Nov 1 13:03:37 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/datastore_wordcount.py | 24 +++++++++++--------- .../io/gcp/datastore/v1/datastoreio.py | 16 +++++++++++-- 2 files changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 099fb08..7204e3b 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -32,12 +32,14 @@ counts them and write the output to a set of files. The following options must be provided to run this pipeline in read-only mode: `` ---project YOUR_PROJECT_ID +--dataset YOUR_DATASET --kind YOUR_DATASTORE_KIND --output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] --read_only `` +Dataset maps to Project ID for v1 version of datastore. + Read-write Mode: In this mode, this example reads words from an input file, converts them to Cloud Datastore ``Entity`` objects and writes them to Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline @@ -47,7 +49,7 @@ write the output to a set of files. The following options must be provided to run this pipeline in read-write mode: `` ---project YOUR_PROJECT_ID +--dataset YOUR_DATASET --kind YOUR_DATASTORE_KIND --output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] `` @@ -77,7 +79,6 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore from apache_beam.metrics import Metrics from apache_beam.metrics.metric import MetricsFilter -from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions @@ -134,7 +135,7 @@ class EntityWrapper(object): return entity -def write_to_datastore(project, user_options, pipeline_options): +def write_to_datastore(user_options, pipeline_options): """Creates a pipeline that writes entities to Cloud Datastore.""" with beam.Pipeline(options=pipeline_options) as p: @@ -144,7 +145,7 @@ def write_to_datastore(project, user_options, pipeline_options): | 'create entity' >> beam.Map( EntityWrapper(user_options.namespace, user_options.kind, user_options.ancestor).make_entity) - | 'write to datastore' >> WriteToDatastore(project)) + | 'write to datastore' >> WriteToDatastore(user_options.dataset)) def make_ancestor_query(kind, namespace, ancestor): @@ -167,7 +168,7 @@ def make_ancestor_query(kind, namespace, ancestor): return query -def read_from_datastore(project, user_options, pipeline_options): +def read_from_datastore(user_options, pipeline_options): """Creates a pipeline that reads entities from Cloud Datastore.""" p = beam.Pipeline(options=pipeline_options) # Create a query to read entities from datastore. @@ -176,7 +177,7 @@ def read_from_datastore(project, user_options, pipeline_options): # Read entities from Cloud Datastore into a PCollection. lines = p | 'read from datastore' >> ReadFromDatastore( - project, query, user_options.namespace) + user_options.dataset, query, user_options.namespace) # Count the occurrences of each word. def count_ones(word_ones): @@ -216,6 +217,9 @@ def run(argv=None): dest='input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Input file to process.') + parser.add_argument('--dataset', + dest='dataset', + help='Dataset ID to read from Cloud Datastore.') parser.add_argument('--kind', dest='kind', required=True, @@ -246,15 +250,13 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - gcloud_options = pipeline_options.view_as(GoogleCloudOptions) # Write to Datastore if `read_only` options is not specified. if not known_args.read_only: - write_to_datastore(gcloud_options.project, known_args, pipeline_options) + write_to_datastore(known_args, pipeline_options) # Read entities from Datastore. - result = read_from_datastore(gcloud_options.project, known_args, - pipeline_options) + result = read_from_datastore(known_args, pipeline_options) empty_lines_filter = MetricsFilter().with_name('empty_lines') query_result = result.metrics().query(empty_lines_filter) http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index 078002c..13209c1 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -89,10 +89,10 @@ class ReadFromDatastore(PTransform): _DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024 def __init__(self, project, query, namespace=None, num_splits=0): - """Initialize the ReadFromDatastore transform. + """Initialize the `ReadFromDatastore` transform. Args: - project: The Project ID + project: The ID of the project to read from. query: Cloud Datastore query to be read from. namespace: An optional namespace. num_splits: Number of splits for the query. @@ -459,7 +459,13 @@ class _Mutate(PTransform): class WriteToDatastore(_Mutate): """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore.""" + def __init__(self, project): + """Initialize the `WriteToDatastore` transform. + + Args: + project: The ID of the project to write to. + """ # Import here to avoid adding the dependency for local running scenarios. try: @@ -486,6 +492,12 @@ class WriteToDatastore(_Mutate): class DeleteFromDatastore(_Mutate): """A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore.""" def __init__(self, project): + """Initialize the `DeleteFromDatastore` transform. + + Args: + project: The ID of the project from which the entities will be deleted. + """ + super(DeleteFromDatastore, self).__init__( project, DeleteFromDatastore.to_delete_mutation)
