[
https://issues.apache.org/jira/browse/BEAM-588?focusedWorklogId=167331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167331
]
ASF GitHub Bot logged work on BEAM-588:
---------------------------------------
Author: ASF GitHub Bot
Created on: 19/Nov/18 09:54
Start Date: 19/Nov/18 09:54
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #6847: [BEAM-588]
Implement profiling for Python DirectRunner and SDK Harness.
URL: https://github.com/apache/beam/pull/6847
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 9c2d4d00ed96..4cd4f014ac07 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -351,6 +351,12 @@ def _add_argparse_args(cls, parser):
help='DirectRunner uses stacked WindowedValues within a Bundle for '
'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
'avoid it.')
+ parser.add_argument(
+ '--direct_runner_bundle_repeat',
+ type=int,
+ default=0,
+ help='replay every bundle this many extra times, for profiling'
+ 'and debugging')
class GoogleCloudOptions(PipelineOptions):
@@ -604,7 +610,12 @@ def _add_argparse_args(cls, parser):
help='Enable work item heap profiling.')
parser.add_argument('--profile_location',
default=None,
- help='GCS path for saving profiler data.')
+ help='path for saving profiler data.')
+ parser.add_argument('--profile_sample_rate',
+ type=float,
+ default=1.0,
+ help='A number between 0 and 1 indicating the ratio '
+ 'of bundles that should be profiled.')
class SetupOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 832cba951808..fe8db344f9a8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -18,12 +18,16 @@
"""A PipelineRunner using the SDK harness.
"""
from __future__ import absolute_import
+from __future__ import print_function
import collections
import contextlib
import copy
import logging
+import os
import queue
+import subprocess
+import sys
import threading
import time
from builtins import object
@@ -39,6 +43,7 @@
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.options import pipeline_options
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
@@ -53,6 +58,7 @@
from apache_beam.runners.worker import sdk_worker
from apache_beam.transforms import trigger
from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import profiler
from apache_beam.utils import proto_utils
# This module is experimental. No backwards-compatibility guarantees.
@@ -222,6 +228,7 @@ def __init__(self, use_grpc=False,
sdk_harness_factory=None, bundle_repeat=0):
self._sdk_harness_factory = sdk_harness_factory
self._bundle_repeat = bundle_repeat
self._progress_frequency = None
+ self._profiler_factory = None
def _next_uid(self):
self._last_uid += 1
@@ -235,11 +242,54 @@ def run_pipeline(self, pipeline):
# are known to be KVs.
from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
pipeline.visit(DataflowRunner.group_by_key_input_visitor())
+ self._bundle_repeat = self._bundle_repeat or pipeline._options.view_as(
+ pipeline_options.DirectOptions).direct_runner_bundle_repeat
+ self._profiler_factory = profiler.Profile.factory_from_options(
+ pipeline._options.view_as(pipeline_options.ProfilingOptions))
return self.run_via_runner_api(pipeline.to_runner_api())
def run_via_runner_api(self, pipeline_proto):
return self.run_stages(*self.create_stages(pipeline_proto))
+ @contextlib.contextmanager
+ def maybe_profile(self):
+ if self._profiler_factory:
+ try:
+ profile_id = 'direct-' + subprocess.check_output(
+ ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
+ ).decode(errors='ignore').strip()
+ except subprocess.CalledProcessError:
+ profile_id = 'direct-unknown'
+ profiler = self._profiler_factory(profile_id, time_prefix='')
+ else:
+ profiler = None
+
+ if profiler:
+ with profiler:
+ yield
+ if not self._bundle_repeat:
+ logging.warning(
+ 'The --direct_runner_bundle_repeat option is not set; '
+ 'a significant portion of the profile may be one-time overhead.')
+ path = profiler.profile_output
+ print('CPU Profile written to %s' % path)
+ try:
+ import gprof2dot # pylint: disable=unused-variable
+ if not subprocess.call([
+ sys.executable, '-m', 'gprof2dot',
+ '-f', 'pstats', path, '-o', path + '.dot']):
+ if not subprocess.call(
+ ['dot', '-Tsvg', '-o', path + '.svg', path + '.dot']):
+ print('CPU Profile rendering at file://%s.svg'
+ % os.path.abspath(path))
+ except ImportError:
+ # pylint: disable=superfluous-parens
+ print('Please install gprof2dot and dot for profile renderings.')
+
+ else:
+ # Empty context.
+ yield
+
def create_stages(self, pipeline_proto):
# First define a couple of helpers.
@@ -1011,14 +1061,15 @@ def run_stages(self, pipeline_components, stages,
safe_coders):
monitoring_infos_by_stage = {}
try:
- pcoll_buffers = collections.defaultdict(list)
- for stage in stages:
- stage_results = self.run_stage(
- controller, pipeline_components, stage,
- pcoll_buffers, safe_coders)
- metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
- monitoring_infos_by_stage[stage.name] = (
- stage_results.process_bundle.monitoring_infos)
+ with self.maybe_profile():
+ pcoll_buffers = collections.defaultdict(list)
+ for stage in stages:
+ stage_results = self.run_stage(
+ controller, pipeline_components, stage,
+ pcoll_buffers, safe_coders)
+ metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
+ monitoring_infos_by_stage[stage.name] = (
+ stage_results.process_bundle.monitoring_infos)
finally:
controller.close()
return RunnerResult(
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index fa811262183c..3c7f3e721802 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -46,7 +46,8 @@
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
- def __init__(self, control_address, worker_count, credentials=None):
+ def __init__(self, control_address, worker_count, credentials=None,
+ profiler_factory=None):
self._worker_count = worker_count
self._worker_index = 0
if credentials is None:
@@ -63,6 +64,7 @@ def __init__(self, control_address, worker_count,
credentials=None):
self._data_channel_factory = data_plane.GrpcClientDataChannelFactory(
credentials)
self._state_handler_factory = GrpcStateHandlerFactory()
+ self._profiler_factory = profiler_factory
self.workers = queue.Queue()
# one thread is enough for getting the progress report.
# Assumption:
@@ -97,7 +99,8 @@ def run(self):
SdkWorker(
state_handler_factory=self._state_handler_factory,
data_channel_factory=self._data_channel_factory,
- fns=self._fns))
+ fns=self._fns,
+ profiler_factory=self._profiler_factory))
def get_responses():
while True:
@@ -201,12 +204,14 @@ def task():
class SdkWorker(object):
- def __init__(self, state_handler_factory, data_channel_factory, fns):
+ def __init__(self, state_handler_factory, data_channel_factory, fns,
+ profiler_factory=None):
self.fns = fns
self.state_handler_factory = state_handler_factory
self.data_channel_factory = data_channel_factory
self.active_bundle_processors = {}
self.cached_bundle_processors = collections.defaultdict(list)
+ self.profiler_factory = profiler_factory
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -228,7 +233,8 @@ def process_bundle(self, request, instruction_id):
with self.get_bundle_processor(
instruction_id,
request.process_bundle_descriptor_reference) as bundle_processor:
- bundle_processor.process_bundle(instruction_id)
+ with self.maybe_profile(instruction_id):
+ bundle_processor.process_bundle(instruction_id)
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
@@ -268,6 +274,18 @@ def process_bundle_progress(self, request, instruction_id):
metrics=processor.metrics() if processor else None,
monitoring_infos=processor.monitoring_infos() if processor else
[]))
+ @contextlib.contextmanager
+ def maybe_profile(self, instruction_id):
+ if self.profiler_factory:
+ profiler = self.profiler_factory(instruction_id)
+ if profiler:
+ with profiler:
+ yield
+ else:
+ yield
+ else:
+ yield
+
class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)):
"""An abstract factory for creating ``DataChannel``."""
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index da21418e4cc3..2e3ae165fc5e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -31,12 +31,14 @@
from google.protobuf import text_format
from apache_beam.internal import pickler
+from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
from apache_beam.runners.worker.sdk_worker import SdkHarness
+from apache_beam.utils import profiler
# This module is experimental. No backwards-compatibility guarantees.
@@ -138,7 +140,10 @@ def main(unused_argv):
assert not service_descriptor.oauth2_client_credentials_grant.url
SdkHarness(
control_address=service_descriptor.url,
- worker_count=_get_worker_count(sdk_pipeline_options)).run()
+ worker_count=_get_worker_count(sdk_pipeline_options),
+ profiler_factory=profiler.Profile.factory_from_options(
+ sdk_pipeline_options.view_as(pipeline_options.ProfilingOptions))
+ ).run()
logging.info('Python sdk harness exiting.')
except: # pylint: disable=broad-except
logging.exception('Python sdk harness failed: ')
diff --git a/sdks/python/apache_beam/utils/profiler.py
b/sdks/python/apache_beam/utils/profiler.py
index 18a712fff642..0606744bc1b9 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -27,12 +27,15 @@
import logging
import os
import pstats
+import random
import tempfile
import time
import warnings
from builtins import object
from threading import Timer
+from apache_beam.io import filesystems
+
class Profile(object):
"""cProfile wrapper context for saving and logging profiler results."""
@@ -40,12 +43,14 @@ class Profile(object):
SORTBY = 'cumulative'
def __init__(self, profile_id, profile_location=None, log_results=False,
- file_copy_fn=None):
+ file_copy_fn=None, time_prefix='%Y-%m-%d_%H_%M_%S-'):
self.stats = None
self.profile_id = str(profile_id)
self.profile_location = profile_location
self.log_results = log_results
- self.file_copy_fn = file_copy_fn
+ self.file_copy_fn = file_copy_fn or self.default_file_copy_fn
+ self.time_prefix = time_prefix
+ self.profile_output = None
def __enter__(self):
logging.info('Start profiling: %s', self.profile_id)
@@ -57,16 +62,19 @@ def __exit__(self, *args):
self.profile.disable()
logging.info('Stop profiling: %s', self.profile_id)
- if self.profile_location and self.file_copy_fn:
+ if self.profile_location:
dump_location = os.path.join(
- self.profile_location, 'profile',
- ('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id)))
+ self.profile_location,
+ time.strftime(self.time_prefix + self.profile_id))
fd, filename = tempfile.mkstemp()
- self.profile.dump_stats(filename)
- logging.info('Copying profiler data to: [%s]', dump_location)
- self.file_copy_fn(filename, dump_location) # pylint:
disable=protected-access
- os.close(fd)
- os.remove(filename)
+ try:
+ os.close(fd)
+ self.profile.dump_stats(filename)
+ logging.info('Copying profiler data to: [%s]', dump_location)
+ self.file_copy_fn(filename, dump_location)
+ finally:
+ os.remove(filename)
+ self.profile_output = dump_location
if self.log_results:
s = io.StringIO()
@@ -75,6 +83,24 @@ def __exit__(self, *args):
self.stats.print_stats()
logging.info('Profiler data: [%s]', s.getvalue())
+ @staticmethod
+ def default_file_copy_fn(src, dest):
+ dest_handle = filesystems.FileSystems.create(dest + '.tmp')
+ try:
+ with open(src, 'rb') as src_handle:
+ dest_handle.write(src_handle.read())
+ finally:
+ dest_handle.close()
+ filesystems.FileSystems.rename([dest + '.tmp'], [dest])
+
+ @staticmethod
+ def factory_from_options(options):
+ if options.profile_cpu:
+ def create_profiler(profile_id, **kwargs):
+ if random.random() < options.profile_sample_rate:
+ return Profile(profile_id, options.profile_location, **kwargs)
+ return create_profiler
+
class MemoryReporter(object):
"""A memory reporter that reports the memory usage and heap profile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 167331)
Time Spent: 1h 10m (was: 1h)
> All runners should support ProfilingOptions
> -------------------------------------------
>
> Key: BEAM-588
> URL: https://issues.apache.org/jira/browse/BEAM-588
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Priority: Minor
> Labels: newbie, starter
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/utils/options.py#L366
> This is useful for profiling pipelines in different environments.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)