Restore (faster) logging context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf9e3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf9e3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf9e3a3 Branch: refs/heads/python-sdk Commit: ecf9e3a3cc3dcbb3413403d4558c95d4a0097350 Parents: 7c9d77a Author: Robert Bradshaw <[email protected]> Authored: Thu Jul 21 15:24:01 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jul 21 17:36:04 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 8 +++++++- sdks/python/apache_beam/runners/common.py | 20 ++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f01a362..7191659 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -27,7 +27,7 @@ cdef class DoFnRunner(object): cdef object window_fn cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers - cdef object logger + cdef object logging_context # TODO(robertwb): Make this a LoggingContext cdef object step_name cdef object main_receivers # TODO(robertwb): Make this a Receiver @@ -44,3 +44,9 @@ cdef class DoFnContext(object): cdef class Receiver(object): cdef receive(self, WindowedValue windowed_value) + + +cdef class LoggingContext(object): + # TODO(robertwb): Optimize "with [cdef class]" + cpdef enter(self) + cpdef exit(self) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 80db823..a565645 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -29,14 +29,12 @@ from apache_beam.transforms.window import WindowFn from apache_beam.utils.windowed_value import WindowedValue -class FakeLogger(object): - def PerThreadLoggingContext(self, *unused_args, **unused_kwargs): - return self +class LoggingContext(object): - def __enter__(self): + def enter(self): pass - def __exit__(self, *unused_args): + def exit(self): pass @@ -76,7 +74,8 @@ class DoFnRunner(object): self.window_fn = windowing.windowfn self.context = context self.tagged_receivers = tagged_receivers - self.logger = logger or FakeLogger() + self.logging_context = (logger.PerThreadLoggingContext(step_name=step_name) + if logger else LoggingContext()) self.step_name = step_name # Optimize for the common case. @@ -85,23 +84,32 @@ class DoFnRunner(object): def start(self): self.context.set_element(None) try: + self.logging_context.enter() self._process_outputs(None, self.dofn.start_bundle(self.context)) except BaseException as exn: self.reraise_augmented(exn) + finally: + self.logging_context.exit() def finish(self): self.context.set_element(None) try: + self.logging_context.enter() self._process_outputs(None, self.dofn.finish_bundle(self.context)) except BaseException as exn: self.reraise_augmented(exn) + finally: + self.logging_context.exit() def process(self, element): try: + self.logging_context.enter() self.context.set_element(element) self._process_outputs(element, self.dofn_process(self.context)) except BaseException as exn: self.reraise_augmented(exn) + finally: + self.logging_context.exit() def reraise_augmented(self, exn): if getattr(exn, '_tagged_with_step', False) or not self.step_name:
