[
https://issues.apache.org/jira/browse/BEAM-3239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286325#comment-16286325
]
ASF GitHub Bot commented on BEAM-3239:
--------------------------------------
asfgit closed pull request #4178: [BEAM-3239] Adding debug server to sdk worker
to get threaddumps
URL: https://github.com/apache/beam/pull/4178
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 684269eee9b..1db8b29175f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
"""SDK Fn Harness entry point."""
+import BaseHTTPServer
import json
import logging
import os
import sys
+import threading
import traceback
from google.protobuf import text_format
@@ -34,6 +35,51 @@
# This module is experimental. No backwards-compatibility guarantees.
+class StatusServer(object):
+
+ @classmethod
+ def get_thread_dump(cls):
+ lines = []
+ frames = sys._current_frames() # pylint: disable=protected-access
+
+ for t in threading.enumerate():
+ lines.append('--- Thread #%s name: %s ---\n' % (t.ident, t.name))
+ lines.append(''.join(traceback.format_stack(frames[t.ident])))
+
+ return lines
+
+ def start(self, status_http_port=0):
+ """Executes the serving loop for the status server.
+
+ Args:
+ status_http_port(int): Binding port for the debug server.
+ Default is 0 which means any free unsecured port
+ """
+
+ class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """HTTP handler for serving stacktraces of all threads."""
+
+ def do_GET(self): # pylint: disable=invalid-name
+ """Return all thread stacktraces information for GET request."""
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+
+ for line in StatusServer.get_thread_dump():
+ self.wfile.write(line)
+
+ def log_message(self, f, *args):
+ """Do not log any messages."""
+ pass
+
+ self.httpd = httpd = BaseHTTPServer.HTTPServer(
+ ('localhost', status_http_port), StatusHttpHandler)
+ logging.info('Status HTTP server running at %s:%s', httpd.server_name,
+ httpd.server_port)
+
+ httpd.serve_forever()
+
+
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
@@ -49,6 +95,12 @@ def main(unused_argv):
else:
fn_log_handler = None
+ # Start status HTTP server thread.
+ thread = threading.Thread(target=StatusServer().start)
+ thread.daemon = True
+ thread.setName('status-server-demon')
+ thread.start()
+
if 'PIPELINE_OPTIONS' in os.environ:
sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
else:
@@ -89,8 +141,8 @@ def main(unused_argv):
def _load_main_session(semi_persistent_directory):
"""Loads a pickled main session from the path specified."""
if semi_persistent_directory:
- session_file = os.path.join(
- semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
+ session_file = os.path.join(semi_persistent_directory, 'staged',
+ names.PICKLED_MAIN_SESSION_FILE)
if os.path.isfile(session_file):
pickler.load_session(session_file)
else:
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
new file mode 100644
index 00000000000..9305c990b10
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for apache_beam.runners.worker.sdk_worker_main."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import unittest
+
+from apache_beam.runners.worker import sdk_worker_main
+
+
+class SdkWorkerMainTest(unittest.TestCase):
+
+ def test_status_server(self):
+
+ # Wrapping the method to see if it appears in threadump
+ def wrapped_method_for_test():
+ lines = sdk_worker_main.StatusServer.get_thread_dump()
+ threaddump = '\n'.join(lines)
+ self.assertRegexpMatches(threaddump, ".*wrapped_method_for_test.*")
+
+ wrapped_method_for_test()
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Enable debug server for python sdk workers
> ------------------------------------------
>
> Key: BEAM-3239
> URL: https://issues.apache.org/jira/browse/BEAM-3239
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Minor
>
> Enable status server to dump threads when http get call is made to the server.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)