This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d86d16  Adding debug server to sdk worker to get threaddumps
     new d65dea6  This closes #4178
2d86d16 is described below

commit 2d86d168118249d3c8bdcb0907f0a3b08db2eb2c
Author: Ankur Goenka <[email protected]>
AuthorDate: Wed Nov 22 16:54:30 2017 -0800

    Adding debug server to sdk worker to get threaddumps
---
 .../apache_beam/runners/worker/sdk_worker_main.py  | 58 ++++++++++++++++++++--
 .../runners/worker/sdk_worker_main_test.py         | 44 ++++++++++++++++
 2 files changed, 99 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 684269e..1db8b29 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -14,13 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 """SDK Fn Harness entry point."""
 
+import BaseHTTPServer
 import json
 import logging
 import os
 import sys
+import threading
 import traceback
 
 from google.protobuf import text_format
@@ -34,6 +35,51 @@ from apache_beam.runners.worker.sdk_worker import SdkHarness
 # This module is experimental. No backwards-compatibility guarantees.
 
 
+class StatusServer(object):
+
+  @classmethod
+  def get_thread_dump(cls):
+    lines = []
+    frames = sys._current_frames()  # pylint: disable=protected-access
+
+    for t in threading.enumerate():
+      lines.append('--- Thread #%s name: %s ---\n' % (t.ident, t.name))
+      lines.append(''.join(traceback.format_stack(frames[t.ident])))
+
+    return lines
+
+  def start(self, status_http_port=0):
+    """Executes the serving loop for the status server.
+
+    Args:
+      status_http_port(int): Binding port for the debug server.
+        Default is 0 which means any free unsecured port
+    """
+
+    class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+      """HTTP handler for serving stacktraces of all threads."""
+
+      def do_GET(self):  # pylint: disable=invalid-name
+        """Return all thread stacktraces information for GET request."""
+        self.send_response(200)
+        self.send_header('Content-Type', 'text/plain')
+        self.end_headers()
+
+        for line in StatusServer.get_thread_dump():
+          self.wfile.write(line)
+
+      def log_message(self, f, *args):
+        """Do not log any messages."""
+        pass
+
+    self.httpd = httpd = BaseHTTPServer.HTTPServer(
+        ('localhost', status_http_port), StatusHttpHandler)
+    logging.info('Status HTTP server running at %s:%s', httpd.server_name,
+                 httpd.server_port)
+
+    httpd.serve_forever()
+
+
 def main(unused_argv):
   """Main entry point for SDK Fn Harness."""
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
@@ -49,6 +95,12 @@ def main(unused_argv):
   else:
     fn_log_handler = None
 
+  # Start status HTTP server thread.
+  thread = threading.Thread(target=StatusServer().start)
+  thread.daemon = True
+  thread.setName('status-server-demon')
+  thread.start()
+
   if 'PIPELINE_OPTIONS' in os.environ:
     sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
   else:
@@ -89,8 +141,8 @@ def main(unused_argv):
 def _load_main_session(semi_persistent_directory):
   """Loads a pickled main session from the path specified."""
   if semi_persistent_directory:
-    session_file = os.path.join(
-        semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
+    session_file = os.path.join(semi_persistent_directory, 'staged',
+                                names.PICKLED_MAIN_SESSION_FILE)
     if os.path.isfile(session_file):
       pickler.load_session(session_file)
     else:
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
new file mode 100644
index 0000000..9305c99
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for apache_beam.runners.worker.sdk_worker_main."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import unittest
+
+from apache_beam.runners.worker import sdk_worker_main
+
+
+class SdkWorkerMainTest(unittest.TestCase):
+
+  def test_status_server(self):
+
+    # Wrapping the method to see if it appears in threadump
+    def wrapped_method_for_test():
+      lines = sdk_worker_main.StatusServer.get_thread_dump()
+      threaddump = '\n'.join(lines)
+      self.assertRegexpMatches(threaddump, ".*wrapped_method_for_test.*")
+
+    wrapped_method_for_test()
+
+
+if __name__ == "__main__":
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to