This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2d86d16 Adding debug server to sdk worker to get threaddumps
new d65dea6 This closes #4178
2d86d16 is described below
commit 2d86d168118249d3c8bdcb0907f0a3b08db2eb2c
Author: Ankur Goenka <[email protected]>
AuthorDate: Wed Nov 22 16:54:30 2017 -0800
Adding debug server to sdk worker to get threaddumps
---
.../apache_beam/runners/worker/sdk_worker_main.py | 58 ++++++++++++++++++++--
.../runners/worker/sdk_worker_main_test.py | 44 ++++++++++++++++
2 files changed, 99 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 684269e..1db8b29 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
"""SDK Fn Harness entry point."""
+import BaseHTTPServer
import json
import logging
import os
import sys
+import threading
import traceback
from google.protobuf import text_format
@@ -34,6 +35,51 @@ from apache_beam.runners.worker.sdk_worker import SdkHarness
# This module is experimental. No backwards-compatibility guarantees.
+class StatusServer(object):
+
+ @classmethod
+ def get_thread_dump(cls):
+ lines = []
+ frames = sys._current_frames() # pylint: disable=protected-access
+
+ for t in threading.enumerate():
+ lines.append('--- Thread #%s name: %s ---\n' % (t.ident, t.name))
+ lines.append(''.join(traceback.format_stack(frames[t.ident])))
+
+ return lines
+
+ def start(self, status_http_port=0):
+ """Executes the serving loop for the status server.
+
+ Args:
+ status_http_port(int): Binding port for the debug server.
+ Default is 0 which means any free unsecured port
+ """
+
+ class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """HTTP handler for serving stacktraces of all threads."""
+
+ def do_GET(self): # pylint: disable=invalid-name
+ """Return all thread stacktraces information for GET request."""
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+
+ for line in StatusServer.get_thread_dump():
+ self.wfile.write(line)
+
+ def log_message(self, f, *args):
+ """Do not log any messages."""
+ pass
+
+ self.httpd = httpd = BaseHTTPServer.HTTPServer(
+ ('localhost', status_http_port), StatusHttpHandler)
+ logging.info('Status HTTP server running at %s:%s', httpd.server_name,
+ httpd.server_port)
+
+ httpd.serve_forever()
+
+
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
@@ -49,6 +95,12 @@ def main(unused_argv):
else:
fn_log_handler = None
+ # Start status HTTP server thread.
+ thread = threading.Thread(target=StatusServer().start)
+ thread.daemon = True
+ thread.setName('status-server-demon')
+ thread.start()
+
if 'PIPELINE_OPTIONS' in os.environ:
sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
else:
@@ -89,8 +141,8 @@ def main(unused_argv):
def _load_main_session(semi_persistent_directory):
"""Loads a pickled main session from the path specified."""
if semi_persistent_directory:
- session_file = os.path.join(
- semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
+ session_file = os.path.join(semi_persistent_directory, 'staged',
+ names.PICKLED_MAIN_SESSION_FILE)
if os.path.isfile(session_file):
pickler.load_session(session_file)
else:
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
new file mode 100644
index 0000000..9305c99
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for apache_beam.runners.worker.sdk_worker_main."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import unittest
+
+from apache_beam.runners.worker import sdk_worker_main
+
+
+class SdkWorkerMainTest(unittest.TestCase):
+
+ def test_status_server(self):
+
+ # Wrapping the method to see if it appears in threadump
+ def wrapped_method_for_test():
+ lines = sdk_worker_main.StatusServer.get_thread_dump()
+ threaddump = '\n'.join(lines)
+ self.assertRegexpMatches(threaddump, ".*wrapped_method_for_test.*")
+
+ wrapped_method_for_test()
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].