Cache dofn.proces method. Saves another couple percent.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c9d77ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c9d77ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c9d77ac Branch: refs/heads/python-sdk Commit: 7c9d77ac2509c7625c5709f885f8b5aadb4f9f74 Parents: f99aa77 Author: Robert Bradshaw <[email protected]> Authored: Thu Jul 21 12:11:03 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jul 21 17:36:04 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 1 + sdks/python/apache_beam/runners/common.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index e855376..f01a362 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -23,6 +23,7 @@ cdef type SideOutputValue, TimestampedValue cdef class DoFnRunner(object): cdef object dofn + cdef object dofn_process cdef object window_fn cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 134fb06..80db823 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -56,6 +56,7 @@ class DoFnRunner(object): step_name=None): if not args and not kwargs: self.dofn = fn + self.dofn_process = fn.process else: args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs) @@ -70,6 +71,8 @@ class DoFnRunner(object): def finish_bundle(self, context): return fn.finish_bundle(context) self.dofn = CurriedFn() + self.dofn_process = lambda context: fn.process(context, *args, **kwargs) + self.window_fn = windowing.windowfn self.context = context self.tagged_receivers = tagged_receivers @@ -96,7 +99,7 @@ class DoFnRunner(object): def process(self, element): try: self.context.set_element(element) - self._process_outputs(element, self.dofn.process(self.context)) + self._process_outputs(element, self.dofn_process(self.context)) except BaseException as exn: self.reraise_augmented(exn)
