[ 
https://issues.apache.org/jira/browse/BEAM-588?focusedWorklogId=167331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167331
 ]

ASF GitHub Bot logged work on BEAM-588:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/18 09:54
            Start Date: 19/Nov/18 09:54
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6847: [BEAM-588] 
Implement profiling for Python DirectRunner and SDK Harness.
URL: https://github.com/apache/beam/pull/6847
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 9c2d4d00ed96..4cd4f014ac07 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -351,6 +351,12 @@ def _add_argparse_args(cls, parser):
         help='DirectRunner uses stacked WindowedValues within a Bundle for '
         'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
         'avoid it.')
+    parser.add_argument(
+        '--direct_runner_bundle_repeat',
+        type=int,
+        default=0,
+        help='replay every bundle this many extra times, for profiling'
+        'and debugging')
 
 
 class GoogleCloudOptions(PipelineOptions):
@@ -604,7 +610,12 @@ def _add_argparse_args(cls, parser):
                         help='Enable work item heap profiling.')
     parser.add_argument('--profile_location',
                         default=None,
-                        help='GCS path for saving profiler data.')
+                        help='path for saving profiler data.')
+    parser.add_argument('--profile_sample_rate',
+                        type=float,
+                        default=1.0,
+                        help='A number between 0 and 1 indicating the ratio '
+                        'of bundles that should be profiled.')
 
 
 class SetupOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 832cba951808..fe8db344f9a8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -18,12 +18,16 @@
 """A PipelineRunner using the SDK harness.
 """
 from __future__ import absolute_import
+from __future__ import print_function
 
 import collections
 import contextlib
 import copy
 import logging
+import os
 import queue
+import subprocess
+import sys
 import threading
 import time
 from builtins import object
@@ -39,6 +43,7 @@
 from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.options import pipeline_options
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
@@ -53,6 +58,7 @@
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.transforms import trigger
 from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import profiler
 from apache_beam.utils import proto_utils
 
 # This module is experimental. No backwards-compatibility guarantees.
@@ -222,6 +228,7 @@ def __init__(self, use_grpc=False, 
sdk_harness_factory=None, bundle_repeat=0):
     self._sdk_harness_factory = sdk_harness_factory
     self._bundle_repeat = bundle_repeat
     self._progress_frequency = None
+    self._profiler_factory = None
 
   def _next_uid(self):
     self._last_uid += 1
@@ -235,11 +242,54 @@ def run_pipeline(self, pipeline):
     # are known to be KVs.
     from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
+    self._bundle_repeat = self._bundle_repeat or pipeline._options.view_as(
+        pipeline_options.DirectOptions).direct_runner_bundle_repeat
+    self._profiler_factory = profiler.Profile.factory_from_options(
+        pipeline._options.view_as(pipeline_options.ProfilingOptions))
     return self.run_via_runner_api(pipeline.to_runner_api())
 
   def run_via_runner_api(self, pipeline_proto):
     return self.run_stages(*self.create_stages(pipeline_proto))
 
+  @contextlib.contextmanager
+  def maybe_profile(self):
+    if self._profiler_factory:
+      try:
+        profile_id = 'direct-' + subprocess.check_output(
+            ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
+        ).decode(errors='ignore').strip()
+      except subprocess.CalledProcessError:
+        profile_id = 'direct-unknown'
+      profiler = self._profiler_factory(profile_id, time_prefix='')
+    else:
+      profiler = None
+
+    if profiler:
+      with profiler:
+        yield
+      if not self._bundle_repeat:
+        logging.warning(
+            'The --direct_runner_bundle_repeat option is not set; '
+            'a significant portion of the profile may be one-time overhead.')
+      path = profiler.profile_output
+      print('CPU Profile written to %s' % path)
+      try:
+        import gprof2dot  # pylint: disable=unused-variable
+        if not subprocess.call([
+            sys.executable, '-m', 'gprof2dot',
+            '-f', 'pstats', path, '-o', path + '.dot']):
+          if not subprocess.call(
+              ['dot', '-Tsvg', '-o', path + '.svg', path + '.dot']):
+            print('CPU Profile rendering at file://%s.svg'
+                  % os.path.abspath(path))
+      except ImportError:
+        # pylint: disable=superfluous-parens
+        print('Please install gprof2dot and dot for profile renderings.')
+
+    else:
+      # Empty context.
+      yield
+
   def create_stages(self, pipeline_proto):
 
     # First define a couple of helpers.
@@ -1011,14 +1061,15 @@ def run_stages(self, pipeline_components, stages, 
safe_coders):
     monitoring_infos_by_stage = {}
 
     try:
-      pcoll_buffers = collections.defaultdict(list)
-      for stage in stages:
-        stage_results = self.run_stage(
-            controller, pipeline_components, stage,
-            pcoll_buffers, safe_coders)
-        metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
-        monitoring_infos_by_stage[stage.name] = (
-            stage_results.process_bundle.monitoring_infos)
+      with self.maybe_profile():
+        pcoll_buffers = collections.defaultdict(list)
+        for stage in stages:
+          stage_results = self.run_stage(
+              controller, pipeline_components, stage,
+              pcoll_buffers, safe_coders)
+          metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
+          monitoring_infos_by_stage[stage.name] = (
+              stage_results.process_bundle.monitoring_infos)
     finally:
       controller.close()
     return RunnerResult(
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index fa811262183c..3c7f3e721802 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -46,7 +46,8 @@
 class SdkHarness(object):
   REQUEST_METHOD_PREFIX = '_request_'
 
-  def __init__(self, control_address, worker_count, credentials=None):
+  def __init__(self, control_address, worker_count, credentials=None,
+               profiler_factory=None):
     self._worker_count = worker_count
     self._worker_index = 0
     if credentials is None:
@@ -63,6 +64,7 @@ def __init__(self, control_address, worker_count, 
credentials=None):
     self._data_channel_factory = data_plane.GrpcClientDataChannelFactory(
         credentials)
     self._state_handler_factory = GrpcStateHandlerFactory()
+    self._profiler_factory = profiler_factory
     self.workers = queue.Queue()
     # one thread is enough for getting the progress report.
     # Assumption:
@@ -97,7 +99,8 @@ def run(self):
           SdkWorker(
               state_handler_factory=self._state_handler_factory,
               data_channel_factory=self._data_channel_factory,
-              fns=self._fns))
+              fns=self._fns,
+              profiler_factory=self._profiler_factory))
 
     def get_responses():
       while True:
@@ -201,12 +204,14 @@ def task():
 
 class SdkWorker(object):
 
-  def __init__(self, state_handler_factory, data_channel_factory, fns):
+  def __init__(self, state_handler_factory, data_channel_factory, fns,
+               profiler_factory=None):
     self.fns = fns
     self.state_handler_factory = state_handler_factory
     self.data_channel_factory = data_channel_factory
     self.active_bundle_processors = {}
     self.cached_bundle_processors = collections.defaultdict(list)
+    self.profiler_factory = profiler_factory
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -228,7 +233,8 @@ def process_bundle(self, request, instruction_id):
     with self.get_bundle_processor(
         instruction_id,
         request.process_bundle_descriptor_reference) as bundle_processor:
-      bundle_processor.process_bundle(instruction_id)
+      with self.maybe_profile(instruction_id):
+        bundle_processor.process_bundle(instruction_id)
       return beam_fn_api_pb2.InstructionResponse(
           instruction_id=instruction_id,
           process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
@@ -268,6 +274,18 @@ def process_bundle_progress(self, request, instruction_id):
             metrics=processor.metrics() if processor else None,
             monitoring_infos=processor.monitoring_infos() if processor else 
[]))
 
+  @contextlib.contextmanager
+  def maybe_profile(self, instruction_id):
+    if self.profiler_factory:
+      profiler = self.profiler_factory(instruction_id)
+      if profiler:
+        with profiler:
+          yield
+      else:
+        yield
+    else:
+      yield
+
 
 class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)):
   """An abstract factory for creating ``DataChannel``."""
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index da21418e4cc3..2e3ae165fc5e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -31,12 +31,14 @@
 from google.protobuf import text_format
 
 from apache_beam.internal import pickler
+from apache_beam.options import pipeline_options
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
 from apache_beam.runners.worker.sdk_worker import SdkHarness
+from apache_beam.utils import profiler
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -138,7 +140,10 @@ def main(unused_argv):
     assert not service_descriptor.oauth2_client_credentials_grant.url
     SdkHarness(
         control_address=service_descriptor.url,
-        worker_count=_get_worker_count(sdk_pipeline_options)).run()
+        worker_count=_get_worker_count(sdk_pipeline_options),
+        profiler_factory=profiler.Profile.factory_from_options(
+            sdk_pipeline_options.view_as(pipeline_options.ProfilingOptions))
+    ).run()
     logging.info('Python sdk harness exiting.')
   except:  # pylint: disable=broad-except
     logging.exception('Python sdk harness failed: ')
diff --git a/sdks/python/apache_beam/utils/profiler.py 
b/sdks/python/apache_beam/utils/profiler.py
index 18a712fff642..0606744bc1b9 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -27,12 +27,15 @@
 import logging
 import os
 import pstats
+import random
 import tempfile
 import time
 import warnings
 from builtins import object
 from threading import Timer
 
+from apache_beam.io import filesystems
+
 
 class Profile(object):
   """cProfile wrapper context for saving and logging profiler results."""
@@ -40,12 +43,14 @@ class Profile(object):
   SORTBY = 'cumulative'
 
   def __init__(self, profile_id, profile_location=None, log_results=False,
-               file_copy_fn=None):
+               file_copy_fn=None, time_prefix='%Y-%m-%d_%H_%M_%S-'):
     self.stats = None
     self.profile_id = str(profile_id)
     self.profile_location = profile_location
     self.log_results = log_results
-    self.file_copy_fn = file_copy_fn
+    self.file_copy_fn = file_copy_fn or self.default_file_copy_fn
+    self.time_prefix = time_prefix
+    self.profile_output = None
 
   def __enter__(self):
     logging.info('Start profiling: %s', self.profile_id)
@@ -57,16 +62,19 @@ def __exit__(self, *args):
     self.profile.disable()
     logging.info('Stop profiling: %s', self.profile_id)
 
-    if self.profile_location and self.file_copy_fn:
+    if self.profile_location:
       dump_location = os.path.join(
-          self.profile_location, 'profile',
-          ('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id)))
+          self.profile_location,
+          time.strftime(self.time_prefix + self.profile_id))
       fd, filename = tempfile.mkstemp()
-      self.profile.dump_stats(filename)
-      logging.info('Copying profiler data to: [%s]', dump_location)
-      self.file_copy_fn(filename, dump_location)  # pylint: 
disable=protected-access
-      os.close(fd)
-      os.remove(filename)
+      try:
+        os.close(fd)
+        self.profile.dump_stats(filename)
+        logging.info('Copying profiler data to: [%s]', dump_location)
+        self.file_copy_fn(filename, dump_location)
+      finally:
+        os.remove(filename)
+      self.profile_output = dump_location
 
     if self.log_results:
       s = io.StringIO()
@@ -75,6 +83,24 @@ def __exit__(self, *args):
       self.stats.print_stats()
       logging.info('Profiler data: [%s]', s.getvalue())
 
+  @staticmethod
+  def default_file_copy_fn(src, dest):
+    dest_handle = filesystems.FileSystems.create(dest + '.tmp')
+    try:
+      with open(src, 'rb') as src_handle:
+        dest_handle.write(src_handle.read())
+    finally:
+      dest_handle.close()
+    filesystems.FileSystems.rename([dest + '.tmp'], [dest])
+
+  @staticmethod
+  def factory_from_options(options):
+    if options.profile_cpu:
+      def create_profiler(profile_id, **kwargs):
+        if random.random() < options.profile_sample_rate:
+          return Profile(profile_id, options.profile_location, **kwargs)
+      return create_profiler
+
 
 class MemoryReporter(object):
   """A memory reporter that reports the memory usage and heap profile.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 167331)
    Time Spent: 1h 10m  (was: 1h)

> All runners should support ProfilingOptions
> -------------------------------------------
>
>                 Key: BEAM-588
>                 URL: https://issues.apache.org/jira/browse/BEAM-588
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Priority: Minor
>              Labels: newbie, starter
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/utils/options.py#L366
> This is useful for profiling pipelines in different environments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to