[ 
https://issues.apache.org/jira/browse/BEAM-8626?focusedWorklogId=373369&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373369
 ]

ASF GitHub Bot logged work on BEAM-8626:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jan/20 03:23
            Start Date: 17/Jan/20 03:23
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on pull request #10598: [BEAM-8626] 
Implement status fn api handler in python sdk
URL: https://github.com/apache/beam/pull/10598#discussion_r367753262
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/worker_status.py
 ##########
 @@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker status api handler for reporting SDK harness debug info."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import queue
+import sys
+import threading
+import traceback
+from collections import defaultdict
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
+from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
+
+
+def thread_dump():
+  # deduplicate threads with same stack trace
+  stack_traces = defaultdict(list)
+  frames = sys._current_frames()  # pylint: disable=protected-access
+
+  for t in threading.enumerate():
+    stack_trace = ''.join(traceback.format_stack(frames[t.ident]))
+    thread_ident_name = (t.ident, t.name)
+    stack_traces[stack_trace].append(thread_ident_name)
+
+  all_traces = ['=' * 10 + 'THREAD DUMP' + '=' * 10]
+  for stack, identity in stack_traces.items():
+    ident, name = identity[0]
+    trace = '--- Thread #%s name: %s %s---\n' % (
+        ident, name, 'and other %d threads' %
+        (len(identity) - 1) if len(identity) > 1 else '')
+    if len(identity) > 1:
+      trace += 'threads: %s\n' % identity
+    trace += stack
+    all_traces.append(trace)
+  all_traces.append('=' * 30)
+  return '\n'.join(x.encode('utf-8') for x in all_traces)
+
+
+def active_processing_bundles_state(bundle_process_cache):
+  active_bundles = ['=' * 10 + 'ACTIVE PROCESSING BUNDLES' + '=' * 10]
+  if not bundle_process_cache.active_bundle_processors:
+    active_bundles.append("No active processing bundles.")
+  else:
+    cache = []
+    for instruction in list(
+        bundle_process_cache.active_bundle_processors.keys()):
+      processor = bundle_process_cache.lookup(instruction)
+      if processor:
+        info = processor.state_sampler.get_info()
+        cache.append((instruction,
+                      processor.process_bundle_descriptor.id,
+                      info.tracked_thread, info.time_since_transition))
+    # reverse sort active bundle by time since last transition, keep top 10.
+    cache.sort(key=lambda x: x[-1], reverse=True)
+    for s in cache[:10]:
+      state = '--- instruction %s ---\n' % s[0]
+      state += 'ProcessBundleDescriptorId: %s\n' % s[1]
+      state += "tracked thread: %s\n" % s[2]
+      state += "time since transition: %.2f seconds\n" % (s[3] / 1e9)
+      active_bundles.append(state)
+
+  active_bundles.append('=' * 30)
+  return '\n'.join(x.encode('utf-8') for x in active_bundles)
+
+
+DONE = object()
+
+
+class FnApiWorkerStatusHandler(object):
+  def __init__(self, status_address, bundle_process_cache=None):
+    self._alive = True
+    self._bundle_process_cache = bundle_process_cache
+    ch = GRPCChannelFactory.insecure_channel(status_address)
+    grpc.channel_ready_future(ch).result(timeout=60)
+    self._status_channel = grpc.intercept_channel(ch, WorkerIdInterceptor())
+    self._status_stub = beam_fn_api_pb2_grpc.BeamFnWorkerStatusStub(
+        self._status_channel)
+    self._responses = queue.Queue()
+    self._server = threading.Thread(target=lambda: self._serve(),
+                                    name='fn_api_status_handler')
+    self._server.daemon = True
+    self._server.start()
+
+  def _get_responses(self):
+    while True:
+      response = self._responses.get()
+      if response is DONE:
+        self._alive = False
+        return
+      yield response
+
+  def _serve(self):
+    while self._alive:
+      for request in self._status_stub.WorkerStatus(self._get_responses()):
+        try:
+          response = self.generate_status_response()
+        except Exception:
+          traceback_string = traceback.format_exc()
+          self._responses.put(
+              beam_fn_api_pb2.WorkerStatusResponse(
+                  id=request.id,
+                  error="Exception encountered while generating "
+                  "status page: %s" % traceback_string))
+          continue
+
+        self._responses.put(
+            beam_fn_api_pb2.WorkerStatusResponse(id=request.id,
+                                                 status_info=response))
 
 Review comment:
   We can remove this if we move addition of response above.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 373369)

> Implement status api handler in python sdk harness
> --------------------------------------------------
>
>                 Key: BEAM-8626
>                 URL: https://issues.apache.org/jira/browse/BEAM-8626
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-harness
>            Reporter: Yichi Zhang
>            Assignee: Yichi Zhang
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to