Repository: beam Updated Branches: refs/heads/master ba539b6ce -> 36ed6dc3c
[BEAM-1925] Remove deprecated context param from DoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0a64744 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0a64744 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0a64744 Branch: refs/heads/master Commit: c0a64744dc91ed08d50abd5ef98cf10e41035c4d Parents: ba539b6 Author: Sourabh Bajaj <[email protected]> Authored: Fri Apr 28 10:47:38 2017 -0700 Committer: [email protected] <[email protected]> Committed: Mon May 1 15:42:05 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline_test.py | 10 ---------- sdks/python/apache_beam/runners/common.py | 24 ++++++++---------------- sdks/python/apache_beam/transforms/core.py | 1 - 3 files changed, 8 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c0a64744/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 05503bd..12348dc 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -298,16 +298,6 @@ class DoFnTest(unittest.TestCase): assert_that(pcoll, equal_to([11, 12])) pipeline.run() - def test_context_param(self): - class TestDoFn(DoFn): - def process(self, element, context=DoFn.ContextParam): - yield context.element + 10 - - pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn()) - assert_that(pcoll, equal_to([11, 12])) - pipeline.run() - def test_side_input_no_tag(self): class TestDoFn(DoFn): def process(self, element, prefix, suffix): http://git-wip-us.apache.org/repos/asf/beam/blob/c0a64744/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index e2a6949..1c3e541 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -99,13 +99,11 @@ class DoFnSignature(object): self._validate_bundle_method(self.finish_bundle_method) def _validate_bundle_method(self, method_wrapper): - # Bundle methods may only contain ContextParam. - # Here we use the fact that every DoFn parameter defined in core.DoFn has # the value that is the same as the name of the parameter and ends with # string 'Param'. - unsupported_dofn_params = [i for i in core.DoFn.__dict__ if ( - i.endswith('Param') and i != 'ContextParam')] + unsupported_dofn_params = [i for i in core.DoFn.__dict__ if + i.endswith('Param')] for param in unsupported_dofn_params: assert param not in method_wrapper.defaults @@ -158,20 +156,18 @@ class DoFnInvoker(object): def invoke_start_bundle(self): """Invokes the DoFn.start_bundle() method. """ - defaults = self.signature.start_bundle_method.defaults - args = [self.context if d == core.DoFn.ContextParam else d - for d in defaults] + args_for_start_bundle = self.signature.start_bundle_method.defaults self.output_processor.start_bundle_outputs( - self.signature.start_bundle_method.method_value(*args)) + self.signature.start_bundle_method.method_value( + *args_for_start_bundle)) def invoke_finish_bundle(self): """Invokes the DoFn.finish_bundle() method. """ - defaults = self.signature.finish_bundle_method.defaults - args = [self.context if d == core.DoFn.ContextParam else d - for d in defaults] + args_for_finish_bundle = self.signature.finish_bundle_method.defaults self.output_processor.finish_bundle_outputs( - self.signature.finish_bundle_method.method_value(*args)) + self.signature.finish_bundle_method.method_value( + *args_for_finish_bundle)) class SimpleInvoker(DoFnInvoker): @@ -237,8 +233,6 @@ class PerWindowInvoker(DoFnInvoker): for a, d in zip(arguments[-len(defaults):], defaults): if d == core.DoFn.ElementParam: args_with_placeholders.append(ArgPlaceholder(d)) - elif d == core.DoFn.ContextParam: - args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.WindowParam: args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.TimestampParam: @@ -291,8 +285,6 @@ class PerWindowInvoker(DoFnInvoker): for i, p in self.placeholders: if p == core.DoFn.ElementParam: args_for_process[i] = windowed_value.value - elif p == core.DoFn.ContextParam: - args_for_process[i] = self.context elif p == core.DoFn.WindowParam: args_for_process[i] = window elif p == core.DoFn.TimestampParam: http://git-wip-us.apache.org/repos/asf/beam/blob/c0a64744/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 64911d6..918c46e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -127,7 +127,6 @@ class DoFn(WithTypeHints, HasDisplayData): """ ElementParam = 'ElementParam' - ContextParam = 'ContextParam' SideInputParam = 'SideInputParam' TimestampParam = 'TimestampParam' WindowParam = 'WindowParam'
