Remove label from windowInto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44d6501d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44d6501d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44d6501d Branch: refs/heads/master Commit: 44d6501dc73e1aa46a5b1138e92c8aa0a8bfe3f5 Parents: 5b0d883 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Fri Feb 10 11:56:50 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Fri Feb 10 16:59:50 2017 -0800 ---------------------------------------------------------------------- .../examples/cookbook/bigquery_schema.py | 3 +- .../examples/cookbook/bigquery_tornadoes.py | 3 +- .../apache_beam/examples/snippets/snippets.py | 21 +++---- .../apache_beam/examples/streaming_wordcount.py | 8 +-- sdks/python/apache_beam/io/iobase.py | 20 ++----- sdks/python/apache_beam/io/textio_test.py | 4 +- sdks/python/apache_beam/pvalue.py | 25 -------- sdks/python/apache_beam/pvalue_test.py | 6 +- .../runners/dataflow/native_io/iobase.py | 10 +--- .../consumer_tracking_pipeline_visitor_test.py | 9 +-- sdks/python/apache_beam/transforms/core.py | 16 ++--- .../python/apache_beam/transforms/ptransform.py | 61 +------------------- .../apache_beam/transforms/sideinputs_test.py | 4 +- sdks/python/apache_beam/transforms/util.py | 4 +- 14 files changed, 42 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 650a886..98aea05 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -113,8 +113,7 @@ def run(argv=None): # pylint: disable=expression-not-assigned record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) - records | beam.io.Write( - 'write', + records | 'write' >> beam.io.Write( beam.io.BigQuerySink( known_args.output, schema=table_schema, http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index 6e1326c..6d79216 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -82,8 +82,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - counts | beam.io.Write( - 'write', + counts | 'write' >> beam.io.Write( beam.io.BigQuerySink( known_args.output, schema='month:INTEGER, tornado_count:INTEGER', http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index f15a089..9ba46cd 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -793,8 +793,8 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, # [START model_custom_sink_new_ptransform] class WriteToKVSink(PTransform): - def __init__(self, label, url, final_table_name, **kwargs): - super(WriteToKVSink, self).__init__(label, **kwargs) + def __init__(self, url, final_table_name, **kwargs): + super(WriteToKVSink, self).__init__(**kwargs) self._url = url self._final_table_name = final_table_name @@ -808,8 +808,8 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, # [START model_custom_sink_use_ptransform] p = beam.Pipeline(options=PipelineOptions()) kvs = p | 'CreateKVs' >> beam.core.Create(KVs) - kvs | WriteToKVSink('WriteToSimpleKV', - 'http://url_to_simple_kv/', final_table_name) + kvs | 'WriteToSimpleKV' >> WriteToKVSink( + 'http://url_to_simple_kv/', final_table_name) # [END model_custom_sink_use_ptransform] p.run().wait_until_finish() @@ -903,24 +903,21 @@ def model_bigqueryio(): # [START model_bigqueryio_read] p = beam.Pipeline(options=PipelineOptions()) - weather_data = p | beam.io.Read( - 'ReadWeatherStations', + weather_data = p | 'ReadWeatherStations' >> beam.io.Read( beam.io.BigQuerySource( 'clouddataflow-readonly:samples.weather_stations')) # [END model_bigqueryio_read] # [START model_bigqueryio_query] p = beam.Pipeline(options=PipelineOptions()) - weather_data = p | beam.io.Read( - 'ReadYearAndTemp', + weather_data = p | 'ReadYearAndTemp' >> beam.io.Read( beam.io.BigQuerySource( query='SELECT year, mean_temp FROM samples.weather_stations')) # [END model_bigqueryio_query] # [START model_bigqueryio_query_standard_sql] p = beam.Pipeline(options=PipelineOptions()) - weather_data = p | beam.io.Read( - 'ReadYearAndTemp', + weather_data = p | 'ReadYearAndTemp' >> beam.io.Read( beam.io.BigQuerySource( query='SELECT year, mean_temp FROM `samples.weather_stations`', use_standard_sql=True)) @@ -933,8 +930,8 @@ def model_bigqueryio(): # [START model_bigqueryio_write] quotes = p | beam.Create( [{'source': 'Mahatma Ghandi', 'quote': 'My life is my message.'}]) - quotes | beam.io.Write( - 'Write', beam.io.BigQuerySink( + quotes | 'Write' >> beam.io.Write( + beam.io.BigQuerySink( 'my-project:output.output_table', schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 7fb2c81..4b6aecc 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -47,8 +47,8 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read( - 'read', beam.io.PubSubSource(known_args.input_topic)) + lines = p | 'read' >> beam.io.Read( + beam.io.PubSubSource(known_args.input_topic)) # Capitalize the characters in each line. transformed = (lines @@ -63,8 +63,8 @@ def run(argv=None): # Write to PubSub. # pylint: disable=expression-not-assigned - transformed | beam.io.Write( - 'pubsub_write', beam.io.PubSubSink(known_args.output_topic)) + transformed | 'pubsub_write' >> beam.io.Write( + beam.io.PubSubSink(known_args.output_topic)) p.run().wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 3a4aec9..a2153f4 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -645,17 +645,13 @@ class Writer(object): class Read(ptransform.PTransform): """A transform that reads a PCollection.""" - def __init__(self, *args, **kwargs): + def __init__(self, source): """Initializes a Read transform. Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, source) or (source). + source: Data source to read from. """ - label, source = self.parse_label_and_arg(args, kwargs, 'source') - super(Read, self).__init__(label) + super(Read, self).__init__() self.source = source def expand(self, pbegin): @@ -706,17 +702,13 @@ class Write(ptransform.PTransform): native write transform. """ - def __init__(self, *args, **kwargs): + def __init__(self, sink): """Initializes a Write transform. Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, sink) or (sink). + sink: Data sink to write to. """ - label, sink = self.parse_label_and_arg(args, kwargs, 'sink') - super(Write, self).__init__(label) + super(Write, self).__init__() self.sink = sink def display_data(self): http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index ea417b0..0dedc95 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -668,7 +668,7 @@ class TextSinkTest(_TestCaseWithTempDirCleanUp): def test_write_dataflow_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText( # pylint: disable=expression-not-assigned self.path + '.gz', shard_name_template='') @@ -684,7 +684,7 @@ class TextSinkTest(_TestCaseWithTempDirCleanUp): def test_write_dataflow_header(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) header_text = 'foo' pcoll | 'Write' >> WriteToText( # pylint: disable=expression-not-assigned self.path + '.gz', http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index c87728c..6dc67b0 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -310,27 +310,6 @@ def _cache_view(pipeline, key, view): pipeline._view_cache[key] = view # pylint: disable=protected-access -def can_take_label_as_first_argument(callee): - """Decorator to allow the "label" kwarg to be passed as the first argument. - - For example, since AsSingleton is annotated with this decorator, this allows - the call "AsSingleton(pcoll, label='label1')" to be written more succinctly - as "AsSingleton('label1', pcoll)". - - Args: - callee: The callable to be called with an optional label argument. - - Returns: - Callable that allows (but does not require) a string label as its first - argument. - """ - def _inner(maybe_label, *args, **kwargs): - if isinstance(maybe_label, basestring): - return callee(*args, label=maybe_label, **kwargs) - return callee(*((maybe_label,) + args), **kwargs) - return _inner - - def _format_view_label(pcoll): # The monitoring UI doesn't like '/' character in transform labels. if not pcoll.producer: @@ -342,7 +321,6 @@ def _format_view_label(pcoll): _SINGLETON_NO_DEFAULT = object() -@can_take_label_as_first_argument def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pylint: disable=invalid-name """Create a SingletonPCollectionView from the contents of input PCollection. @@ -386,7 +364,6 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pyli return view -@can_take_label_as_first_argument def AsIter(pcoll, label=None): # pylint: disable=invalid-name """Create an IterablePCollectionView from the elements of input PCollection. @@ -417,7 +394,6 @@ def AsIter(pcoll, label=None): # pylint: disable=invalid-name return view -@can_take_label_as_first_argument def AsList(pcoll, label=None): # pylint: disable=invalid-name """Create a ListPCollectionView from the elements of input PCollection. @@ -448,7 +424,6 @@ def AsList(pcoll, label=None): # pylint: disable=invalid-name return view -@can_take_label_as_first_argument def AsDict(pcoll, label=None): # pylint: disable=invalid-name """Create a DictPCollectionView from the elements of input PCollection. http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/pvalue_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py index 6748f17..6b4b663 100644 --- a/sdks/python/apache_beam/pvalue_test.py +++ b/sdks/python/apache_beam/pvalue_test.py @@ -49,10 +49,10 @@ class PValueTest(unittest.TestCase): value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)]) value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)]) self.assertEqual(AsSingleton(value), AsSingleton(value)) - self.assertEqual(AsSingleton('new', value, default_value=1), - AsSingleton('new', value, default_value=1)) + self.assertEqual(AsSingleton(value, default_value=1, label='new'), + AsSingleton(value, default_value=1, label='new')) self.assertNotEqual(AsSingleton(value), - AsSingleton('new', value, default_value=1)) + AsSingleton(value, default_value=1, label='new')) self.assertEqual(AsIter(value), AsIter(value)) self.assertEqual(AsList(value), AsList(value)) self.assertEqual(AsDict(value2), AsDict(value2)) http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index b6eb288..529d414 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -293,17 +293,13 @@ class _NativeWrite(ptransform.PTransform): Applying this transform results in a ``pvalue.PDone``. """ - def __init__(self, *args, **kwargs): + def __init__(self, sink): """Initializes a Write transform. Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, sink) or (sink). + sink: Sink to use for the write """ - label, sink = self.parse_label_and_arg(args, kwargs, 'sink') - super(_NativeWrite, self).__init__(label) + super(_NativeWrite, self).__init__() self.sink = sink def expand(self, pcoll): http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 126a7de..73b897f 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -51,12 +51,12 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): class DummySource(iobase.BoundedSource): pass - root_read = Read('read', DummySource()) + root_read = Read(DummySource()) root_flatten = Flatten(pipeline=self.pipeline) pbegin = pvalue.PBegin(self.pipeline) pcoll_create = pbegin | 'create' >> root_create - pbegin | root_read + pbegin | 'read' >> root_read pcoll_create | FlatMap(lambda x: x) [] | 'flatten' >> root_flatten @@ -64,10 +64,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_transforms = sorted( [t.transform for t in self.visitor.root_transforms]) - print root_transforms - print root_read - print root_create - print root_flatten + self.assertEqual(root_transforms, sorted( [root_read, root_create, root_flatten])) http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2557b7e..2efe38d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -578,7 +578,7 @@ class ParDo(PTransformWithSideInputs): Args: pcoll: a PCollection to be processed. - dofn: a DoFn object to be applied to each element of pcoll argument. + fn: a DoFn object to be applied to each element of pcoll argument. *args: positional arguments passed to the dofn object. **kwargs: keyword arguments passed to the dofn object. @@ -1126,8 +1126,8 @@ class GroupByKey(PTransform): class GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" - def __init__(self, label=None): - super(GroupByKeyOnly, self).__init__(label) + def __init__(self): + super(GroupByKeyOnly, self).__init__() def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) @@ -1235,25 +1235,19 @@ class WindowInto(ParDo): new_windows = self.windowing.windowfn.assign(context) yield WindowedValue(element, context.timestamp, new_windows) - def __init__(self, *args, **kwargs): + def __init__(self, windowfn, *args, **kwargs): """Initializes a WindowInto transform. Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, windowfn) or (windowfn). - The optional trigger and accumulation_mode kwargs may also be provided. + windowfn: Function to be used for windowing """ triggerfn = kwargs.pop('trigger', None) accumulation_mode = kwargs.pop('accumulation_mode', None) output_time_fn = kwargs.pop('output_time_fn', None) - label, windowfn = self.parse_label_and_arg(args, kwargs, 'windowfn') self.windowing = Windowing(windowfn, triggerfn, accumulation_mode, output_time_fn) dofn = self.WindowIntoFn(self.windowing) super(WindowInto, self).__init__(dofn) - self.label = label def get_windowing(self, unused_inputs): return self.windowing http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 994c09b..aca5822 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -204,65 +204,6 @@ class PTransform(WithTypeHints, HasDisplayData): def default_label(self): return self.__class__.__name__ - @classmethod - def parse_label_and_arg(cls, args, kwargs, arg_name): - """Parses a tuple of positional arguments into label, arg_name. - - The function is used by functions that take a (label, arg_name) list of - parameters and in which first label could be optional even if the arg_name - is not passed as a keyword. More specifically the following calling patterns - are allowed:: - - (value) - ('label', value) - (arg_name=value) - ('label', arg_name=value) - (value, label='label') - (label='label', arg_name=value) - - Args: - args: A tuple of position arguments. - kwargs: A dictionary of keyword arguments. - arg_name: The name of the second argument. - - Returns: - A (label, value) tuple. The label will be the one passed in or one - derived from the class name. The value will the corresponding value for - the arg_name argument. - - Raises: - ValueError: If the label and value cannot be deduced from args and kwargs - and also if the label is not a string. - """ - # TODO(robertwb): Fix to not silently drop extra arguments. - kw_label = kwargs.get('label', None) - kw_value = kwargs.get(arg_name, None) - - if kw_value is not None: - value = kw_value - else: - value = args[1] if len(args) > 1 else args[0] if args else None - - if kw_label is not None: - label = kw_label - else: - # We need to get a label from positional arguments. If we did not get a - # keyword value for the arg_name either then expect that a one element - # list will provide the value and the label will be derived from the class - # name. - num_args = len(args) - if kw_value is None: - label = args[0] if num_args >= 2 else cls.__name__ - else: - label = args[0] if num_args >= 1 else cls.__name__ - - if label is None or value is None or not isinstance(label, basestring): - raise ValueError( - '%s expects a (label, %s) or (%s) argument list ' - 'instead of args=%s, kwargs=%s' % ( - cls.__name__, arg_name, arg_name, args, kwargs)) - return label, value - def with_input_types(self, input_type_hint): """Annotates the input type of a PTransform with a type-hint. @@ -633,9 +574,9 @@ class CallablePTransform(PTransform): return res def __call__(self, *args, **kwargs): + super(CallablePTransform, self).__init__() self._args = args self._kwargs = kwargs - super(CallablePTransform, self).__init__() return self def expand(self, pcoll): http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 9384e7b..4672709 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -250,8 +250,8 @@ class SideInputsTest(unittest.TestCase): side_list = pipeline | 'side list' >> beam.Create(a_list) results = main_input | beam.FlatMap( lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton('si1', side_list, default_value=2), - beam.pvalue.AsSingleton('si2', side_list, default_value=3)) + beam.pvalue.AsSingleton(side_list, default_value=2, label='si1'), + beam.pvalue.AsSingleton(side_list, default_value=3, label='si2')) def matcher(expected_elem, expected_singleton1, expected_singleton2): def match(actual): http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 1f691ca..56902dd 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -87,8 +87,8 @@ class CoGroupByKey(PTransform): to provide pipeline information, and should be considered mandatory. """ - def __init__(self, label=None, **kwargs): - super(CoGroupByKey, self).__init__(label) + def __init__(self, **kwargs): + super(CoGroupByKey, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())