Repository: beam Updated Branches: refs/heads/master 074031cac -> 63dc08ee1
To add sdks/python/utils/profiler a MemoryReporter that tracks heap profiles. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a592053 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a592053 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a592053 Branch: refs/heads/master Commit: 0a59205314d21f452b91848496d6ae21e369d7df Parents: 074031c Author: Younghee Kwon <[email protected]> Authored: Mon Feb 6 12:35:50 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Tue Feb 7 10:16:42 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/utils/profiler.py | 79 +++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0a592053/sdks/python/apache_beam/utils/profiler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index 1214172..3599f98 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -24,7 +24,8 @@ import pstats import StringIO import tempfile import time - +from threading import Timer +import warnings from apache_beam.utils.dependency import _dependency_file_copy @@ -67,3 +68,79 @@ class Profile(object): self.profile, stream=s).sort_stats(Profile.SORTBY) self.stats.print_stats() logging.info('Profiler data: [%s]', s.getvalue()) + + +class MemoryReporter(object): + """A memory reporter that reports the memory usage and heap profile. + Usage: + mr = MemoryReporter(interval_second=30.0) + mr.start() + while ... + <do something> + # this will report continuously with 30 seconds between reports. + mr.stop() + + NOTE: A reporter with start() should always stop(), or the parent process can + never finish. + + Or simply the following which does star() and stop(): + with MemoryReporter(interval_second=100): + while ... + <do some thing> + + Also it could report on demand without continuous reporting. + mr = MemoryReporter() # default interval 60s but not started. + <do something> + mr.report_once() + """ + + def __init__(self, interval_second=60.0): + # guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10 + # The reporter can be set up only when guppy is installed (and guppy cannot + # be added to the required packages in setup.py, since it's not available + # in all platforms). + try: + from guppy import hpy # pylint: disable=import-error + self._hpy = hpy + self._interval_second = interval_second + self._timer = None + except ImportError: + warnings.warn('guppy is not installed; MemoryReporter not available.') + self._hpy = None + self._enabled = False + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + def start(self): + if self._enabled or not self._hpy: + return + self._enabled = True + + def report_with_interval(): + if not self._enabled: + return + self.report_once() + self._timer = Timer(self._interval_second, report_with_interval) + self._timer.start() + + self._timer = Timer(self._interval_second, report_with_interval) + self._timer.start() + + def stop(self): + if not self._enabled: + return + self._timer.cancel() + self._enabled = False + + def report_once(self): + if not self._hpy: + return + report_start_time = time.time() + heap_profile = self._hpy().heap() + logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds', + heap_profile, time.time() - report_start_time)
