Repository: incubator-beam Updated Branches: refs/heads/python-sdk d3c887480 -> e26527873
Rename PTransform.apply() to PTransform.expand() See https://issues.apache.org/jira/browse/BEAM-1125 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e62249a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e62249a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e62249a1 Branch: refs/heads/python-sdk Commit: e62249a1f0a170f7e16926a3f0e6bc25d1422c22 Parents: d3c8874 Author: Ahmet Altay <al...@google.com> Authored: Thu Dec 15 14:27:08 2016 -0800 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Dec 15 16:51:50 2016 -0800 ---------------------------------------------------------------------- sdks/python/README.md | 2 +- .../examples/complete/autocomplete.py | 2 +- .../examples/complete/estimate_pi.py | 2 +- .../apache_beam/examples/complete/tfidf.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 6 ++--- .../examples/cookbook/custom_ptransform.py | 2 +- .../examples/cookbook/multiple_output_pardo.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 16 ++++++------- .../examples/snippets/snippets_test.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- sdks/python/apache_beam/io/avroio.py | 4 ++-- .../apache_beam/io/datastore/v1/datastoreio.py | 4 ++-- sdks/python/apache_beam/io/iobase.py | 6 ++--- sdks/python/apache_beam/io/textio.py | 4 ++-- sdks/python/apache_beam/pipeline_test.py | 4 ++-- .../runners/dataflow/native_io/iobase.py | 2 +- .../apache_beam/runners/direct/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 4 ++-- sdks/python/apache_beam/transforms/combiners.py | 14 ++++++------ .../apache_beam/transforms/combiners_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 24 ++++++++++---------- .../python/apache_beam/transforms/ptransform.py | 10 ++++---- .../apache_beam/transforms/ptransform_test.py | 6 ++--- .../python/apache_beam/transforms/sideinputs.py | 10 ++++---- sdks/python/apache_beam/transforms/util.py | 4 ++-- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 2 +- 27 files changed, 71 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/README.md ---------------------------------------------------------------------- diff --git a/sdks/python/README.md b/sdks/python/README.md index 820084d..5ea2a60 100644 --- a/sdks/python/README.md +++ b/sdks/python/README.md @@ -262,7 +262,7 @@ import re import apache_beam as beam p = beam.Pipeline('DirectPipelineRunner') class MyCountTransform(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'one word' >> beam.Map(lambda word: (word, 1)) # GroupByKey accepts a PCollection of (word, 1) elements and http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index c3cd88f..eaa5ca2 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -60,7 +60,7 @@ class TopPerPrefix(beam.PTransform): super(TopPerPrefix, self).__init__() self._count = count - def apply(self, words): + def expand(self, words): """Compute the most common words for each possible prefixes. Args: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 37c1aad..682c6d2 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -90,7 +90,7 @@ class JsonCoder(object): class EstimatePiTransform(beam.PTransform): """Runs 10M trials, and combine the results to estimate pi.""" - def apply(self, pcoll): + def expand(self, pcoll): # A hundred work items of a hundred thousand tries each. return (pcoll | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 043d5f6..59b2900 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -53,7 +53,7 @@ class TfIdf(beam.PTransform): The output is mapping from terms to scores for each document URI. """ - def apply(self, uri_to_content): + def expand(self, uri_to_content): # Compute the total number of documents, and prepare a singleton # PCollection to use as side input. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index a48a383..2d66d7f 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -76,7 +76,7 @@ class ComputeSessions(beam.PTransform): def __init__(self): super(ComputeSessions, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.WindowInto('ComputeSessionsWindow', window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) @@ -89,7 +89,7 @@ class TopPerMonth(beam.PTransform): def __init__(self): super(TopPerMonth, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.WindowInto('TopPerMonthWindow', window.FixedWindows( @@ -127,7 +127,7 @@ class ComputeTopSessions(beam.PTransform): super(ComputeTopSessions, self).__init__() self.sampling_threshold = sampling_threshold - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index ca13bbf..b9d64cf 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -36,7 +36,7 @@ from apache_beam.utils.options import PipelineOptions class Count1(beam.PTransform): """Count as a subclass of PTransform, with an apply method.""" - def apply(self, pcoll): + def expand(self, pcoll): return ( pcoll | 'ParWithOne' >> beam.Map(lambda v: (v, 1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index d24170e..167e709 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -112,7 +112,7 @@ class CountWords(beam.PTransform): of "word: count" strings. """ - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 6dcf05e..f78ecd8 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -80,7 +80,7 @@ def construct_pipeline(renames): class ReverseWords(beam.PTransform): """A PTransform that reverses individual elements in a PCollection.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.Map(lambda e: e[::-1]) def filter_words(unused_x): @@ -387,7 +387,7 @@ def pipeline_monitoring(renames): # The CountWords Composite Transform inside the WordCount pipeline. class CountWords(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) @@ -508,7 +508,7 @@ def examples_wordcount_wordcount(renames): # [START examples_wordcount_wordcount_composite] class CountWords(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll # Convert lines of text into individual words. | beam.FlatMap( @@ -705,7 +705,7 @@ def model_custom_source(count): super(ReadFromCountingSource, self).__init__(**kwargs) self._count = count - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | iobase.Read(_CountingSource(count)) # [END model_custom_source_new_ptransform] @@ -838,7 +838,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, self._url = url self._final_table_name = final_table_name - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | iobase.Write(_SimpleKVSink(self._url, self._final_table_name)) # [END model_custom_sink_new_ptransform] @@ -1001,7 +1001,7 @@ def model_composite_transform_example(contents, output_path): class CountWords(beam.PTransform): # [END composite_ptransform_declare] - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.FlatMap(lambda x: re.findall(r'\w+', x)) | beam.combiners.Count.PerElement() @@ -1197,7 +1197,7 @@ def model_join_using_side_inputs( # [START model_library_transforms_keys] class Keys(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | 'Keys' >> beam.Map(lambda (k, v): k) # [END model_library_transforms_keys] # pylint: enable=invalid-name @@ -1206,7 +1206,7 @@ class Keys(beam.PTransform): # [START model_library_transforms_count] class Count(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return ( pcoll | 'PairWithOne' >> beam.Map(lambda v: (v, 1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 09b4ba4..db2ea81 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -279,7 +279,7 @@ class TypeHintsTest(unittest.TestCase): @beam.typehints.with_input_types(T) @beam.typehints.with_output_types(beam.typehints.Tuple[int, T]) class MyTransform(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.Map(lambda x: (len(x), x)) words_with_lens = words | MyTransform() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 473a486..cdf4e0c 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -95,7 +95,7 @@ class CountWords(beam.PTransform): def __init__(self): super(CountWords, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 35d0e94..3663bdb 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -77,7 +77,7 @@ class ReadFromAvro(PTransform): super(ReadFromAvro, self).__init__() self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate) - def apply(self, pvalue): + def expand(self, pvalue): return pvalue.pipeline | Read(self._source) def display_data(self): @@ -294,7 +294,7 @@ class WriteToAvro(beam.transforms.PTransform): self._sink = _AvroSink(file_path_prefix, schema, codec, file_name_suffix, num_shards, shard_name_template, mime_type) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.io.iobase.Write(self._sink) def display_data(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index a86bb0b..93c592d 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -102,7 +102,7 @@ class ReadFromDatastore(PTransform): self._query = query self._num_splits = num_splits - def apply(self, pcoll): + def expand(self, pcoll): # This is a composite transform involves the following: # 1. Create a singleton of the user provided `query` and apply a ``ParDo`` # that splits the query into `num_splits` and assign each split query a @@ -312,7 +312,7 @@ class _Mutate(PTransform): self._mutation_fn = mutation_fn logging.warning('datastoreio write transform is experimental.') - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'Convert to Mutation' >> Map(self._mutation_fn) | 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index fd6ae57..8fb5238 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -658,7 +658,7 @@ class Read(ptransform.PTransform): super(Read, self).__init__(label) self.source = source - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) @@ -723,7 +723,7 @@ class Write(ptransform.PTransform): return {'sink': self.sink.__class__, 'sink_dd': self.sink} - def apply(self, pcoll): + def expand(self, pcoll): from apache_beam.runners.dataflow.native_io import iobase as dataflow_io if isinstance(self.sink, dataflow_io.NativeSink): # A native sink @@ -746,7 +746,7 @@ class WriteImpl(ptransform.PTransform): super(WriteImpl, self).__init__() self.sink = sink - def apply(self, pcoll): + def expand(self, pcoll): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) init_result_coll = do_once | core.Map( 'initialize_write', lambda _, sink: sink.initialize_write(), self.sink) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index ebadf69..09cf024 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -239,7 +239,7 @@ class ReadFromText(PTransform): strip_trailing_newlines, coder, validate=validate) - def apply(self, pvalue): + def expand(self, pvalue): return pvalue.pipeline | Read(self._source) def display_data(self): @@ -297,7 +297,7 @@ class WriteToText(PTransform): append_trailing_newlines, num_shards, shard_name_template, coder, compression_type) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | Write(self._sink) def display_data(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index c50f04d..5af4811 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -73,7 +73,7 @@ class PipelineTest(unittest.TestCase): class CustomTransform(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) class Visitor(PipelineVisitor): @@ -174,7 +174,7 @@ class PipelineTest(unittest.TestCase): # No call to super(...).__init__ self.suffix = suffix - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | Map(lambda x: x + self.suffix) self.assertEqual( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 32da3a2..b6eb288 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -306,6 +306,6 @@ class _NativeWrite(ptransform.PTransform): super(_NativeWrite, self).__init__(label) self.sink = sink - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PDone(pcoll.pipeline) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 1afd486..fa78902 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -92,7 +92,7 @@ class DirectPipelineRunner(PipelineRunner): def apply(self, transform, input): # pylint: disable=redefined-builtin """Runner callback for a pipeline.apply call.""" - return transform.apply(input) + return transform.expand(input) class BufferingInMemoryCache(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 0f53d65..ec15bee 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -148,8 +148,8 @@ class PipelineRunner(object): 'Execution of [%s] not implemented in runner %s.' % (transform, self)) def apply_PTransform(self, transform, input): - # The base case of apply is to call the transform's apply. - return transform.apply(input) + # The base case of apply is to call the transform's expand. + return transform.expand(input) def run_transform(self, transform_node): """Runner callback for a pipeline.run call. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 22d2b3e..96fcddd 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -58,13 +58,13 @@ class Mean(object): class Globally(ptransform.PTransform): """combiners.Mean.Globally computes the arithmetic mean of the elements.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(MeanCombineFn()) class PerKey(ptransform.PTransform): """combiners.Mean.PerKey finds the means of the values for each key.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombinePerKey(MeanCombineFn()) @@ -105,19 +105,19 @@ class Count(object): class Globally(ptransform.PTransform): """combiners.Count.Globally counts the total number of elements.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(CountCombineFn()) class PerKey(ptransform.PTransform): """combiners.Count.PerKey counts how many elements each unique key has.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombinePerKey(CountCombineFn()) class PerElement(ptransform.PTransform): """combiners.Count.PerElement counts how many times each element occurs.""" - def apply(self, pcoll): + def expand(self, pcoll): paired_with_void_type = KV[pcoll.element_type, Any] return (pcoll | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None)) @@ -475,7 +475,7 @@ class ToList(ptransform.PTransform): def __init__(self, label='ToList'): super(ToList, self).__init__(label) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToListCombineFn()) @@ -509,7 +509,7 @@ class ToDict(ptransform.PTransform): def __init__(self, label='ToDict'): super(ToDict, self).__init__(label) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToDictCombineFn()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 8dc274e..6113ea2 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -318,7 +318,7 @@ class CombineTest(unittest.TestCase): def test_combine_globally_with_default_side_input(self): class CombineWithSideInput(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 523c5a6..0ba1c62 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -598,7 +598,7 @@ class ParDo(PTransformWithSideInputs): label='Transform Function'), 'fn_dd': self.fn} - def apply(self, pcoll): + def expand(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. self.dofn = self.fn @@ -641,7 +641,7 @@ class _MultiParDo(PTransform): self._tags = tags self._main_tag = main_tag - def apply(self, pcoll): + def expand(self, pcoll): _ = pcoll | self._do_transform return pvalue.DoOutputsTuple( pcoll.pipeline, self._do_transform, self._tags, self._main_tag) @@ -854,7 +854,7 @@ class CombineGlobally(PTransform): def as_singleton_view(self): return self.clone(as_view=True) - def apply(self, pcoll): + def expand(self, pcoll): def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -939,7 +939,7 @@ class CombinePerKey(PTransformWithSideInputs): def process_argspec_fn(self): return self.fn._fn # pylint: disable=protected-access - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) return pcoll | GroupByKey() | CombineValues('Combine', @@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn) - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) @@ -1083,7 +1083,7 @@ class GroupByKey(PTransform): timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((k, wvalue.value)) - def apply(self, pcoll): + def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type @@ -1132,7 +1132,7 @@ class GroupByKeyOnly(PTransform): key_type, value_type = trivial_inference.key_value_types(input_type) return KV[key_type, Iterable[value_type]] - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) @@ -1170,7 +1170,7 @@ class Partition(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) - def apply(self, pcoll): + def expand(self, pcoll): n = int(self.args[0]) return pcoll | ParDo( self.ApplyPartitionFnFn(), self.fn, *self.args, @@ -1261,14 +1261,14 @@ class WindowInto(ParDo): def infer_output_type(self, input_type): return input_type - def apply(self, pcoll): + def expand(self, pcoll): input_type = pcoll.element_type if input_type is not None: output_type = input_type self.with_input_types(input_type) self.with_output_types(output_type) - return super(WindowInto, self).apply(pcoll) + return super(WindowInto, self).expand(pcoll) # Python's pickling is broken for nested classes. @@ -1305,7 +1305,7 @@ class Flatten(PTransform): raise ValueError('Input to Flatten must be an iterable.') return pvalueish, pvalueish - def apply(self, pcolls): + def expand(self, pcolls): for pcoll in pcolls: self._check_pcollection(pcoll) return pvalue.PCollection(self.pipeline) @@ -1345,7 +1345,7 @@ class Create(PTransform): else: return Union[[trivial_inference.instance_to_type(v) for v in self.value]] - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 2212d00..1bd7fb4 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -359,7 +359,7 @@ class PTransform(WithTypeHints, HasDisplayData): transform.label = new_label return transform - def apply(self, input_or_inputs): + def expand(self, input_or_inputs): raise NotImplementedError def __str__(self): @@ -493,7 +493,7 @@ class ChainedPTransform(PTransform): else: return NotImplemented - def apply(self, pval): + def expand(self, pval): return reduce(operator.or_, self._parts, pval) @@ -650,7 +650,7 @@ class CallablePTransform(PTransform): super(CallablePTransform, self).__init__(label=label) return self - def apply(self, pcoll): + def expand(self, pcoll): # Since the PTransform will be implemented entirely as a function # (once called), we need to pass through any type-hinting information that # may have been annotated via the .with_input_types() and @@ -700,7 +700,7 @@ def ptransform_fn(fn): super(CustomMapper, self).__init__() self.mapfn = mapfn - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | ParDo(self.mapfn) With either method the custom PTransform can be used in pipelines as if @@ -738,5 +738,5 @@ class _NamedPTransform(PTransform): def __ror__(self, pvalueish): return self.transform.__ror__(pvalueish, self.label) - def apply(self, pvalue): + def expand(self, pvalue): raise RuntimeError("Should never be applied directly.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index e3b1026..9118fee 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -526,7 +526,7 @@ class PTransformTest(unittest.TestCase): def test_multi_input_ptransform(self): class DisjointUnion(PTransform): - def apply(self, pcollections): + def expand(self, pcollections): return (pcollections | beam.Flatten() | beam.Map(lambda x: (x, None)) @@ -545,7 +545,7 @@ class PTransformTest(unittest.TestCase): pvalueish = list(pvalueish) return pvalueish, sum([list(p.values()) for p in pvalueish], []) - def apply(self, pcoll_dicts): + def expand(self, pcoll_dicts): keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts]) res = {} for k in keys: @@ -575,7 +575,7 @@ class PTransformLabelsTest(unittest.TestCase): pardo = None - def apply(self, pcoll): + def expand(self, pcoll): self.pardo = beam.FlatMap('*do*', lambda x: [x + 1]) return pcoll | self.pardo http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 05ba6ab..46731bf 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -50,7 +50,7 @@ class CreatePCollectionView(PTransform): # typehints.View[...]. return input_type - def apply(self, pcoll): + def expand(self, pcoll): return self.view @@ -68,7 +68,7 @@ class ViewAsSingleton(PTransform): self.has_default = has_default self.default_value = default_value - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = input_type @@ -93,7 +93,7 @@ class ViewAsIterable(PTransform): label = 'ViewAsIterable(%s)' % label super(ViewAsIterable, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = typehints.Iterable[input_type] @@ -118,7 +118,7 @@ class ViewAsList(PTransform): label = 'ViewAsList(%s)' % label super(ViewAsList, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = typehints.List[input_type] @@ -144,7 +144,7 @@ class ViewAsDict(PTransform): label = 'ViewAsDict(%s)' % label super(ViewAsDict, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type key_type, value_type = ( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ebe6ba9..9815996 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -99,7 +99,7 @@ class CoGroupByKey(PTransform): pcolls = tuple(pvalueish) return pcolls, pcolls - def apply(self, pcolls): + def expand(self, pcolls): """Performs CoGroupByKey on argument pcolls; see class docstring.""" # For associating values in K-V pairs with the PCollections they came from. def _pair_tag_with_value((key, value), tag): @@ -222,7 +222,7 @@ def assert_that(actual, matcher, label='assert_that'): class AssertThat(PTransform): - def apply(self, pipeline): + def expand(self, pipeline): return pipeline | 'singleton' >> Create([None]) | Map( match, AsList(actual | core.WindowInto(window.GlobalWindows()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index e7cdbd4..9a1a7de 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -84,7 +84,7 @@ class WriteToTestSink(PTransform): self.last_sink = None self.label = 'write_to_test_sink' - def apply(self, pcoll): + def expand(self, pcoll): self.last_sink = _TestSink(return_init_result=self.return_init_result, return_write_results=self.return_write_results) return pcoll | beam.io.Write(self.last_sink) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index f2e8f12..329d657 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -202,7 +202,7 @@ class CustomTransformTest(unittest.TestCase): def _extract_input_pvalues(self, pvalueish): return pvalueish, (pvalueish['in0'], pvalueish['in1']) - def apply(self, pvalueish): + def expand(self, pvalueish): return {'out0': pvalueish['in0'], 'out1': pvalueish['in1']} # TODO(robertwb): (typecheck) Make these the default?