[ 
https://issues.apache.org/jira/browse/BEAM-5167?focusedWorklogId=176775&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176775
 ]

ASF GitHub Bot logged work on BEAM-5167:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Dec/18 22:20
            Start Date: 18/Dec/18 22:20
    Worklog Time Spent: 10m 
      Work Description: angoenka closed pull request #7248: [BEAM-5167] Ensure 
monitoring thread does not prevent process exit.
URL: https://github.com/apache/beam/pull/7248
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index c3f2b91693e2..bbc63c2a8d8f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -77,7 +77,6 @@ def __init__(self, control_address, worker_count, 
credentials=None,
     self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
     self._process_thread_pool = futures.ThreadPoolExecutor(
         max_workers=self._worker_count)
-    self._monitoring_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
     self._instruction_id_vs_worker = {}
     self._fns = {}
     self._responses = queue.Queue()
@@ -106,8 +105,6 @@ def run(self):
               fns=self._fns,
               profiler_factory=self._profiler_factory))
 
-    self._monitoring_thread_pool.submit(self._monitor_process_bundle)
-
     def get_responses():
       while True:
         response = self._responses.get()
@@ -115,21 +112,27 @@ def get_responses():
           return
         yield response
 
-    for work_request in control_stub.Control(get_responses()):
-      logging.debug('Got work %s', work_request.instruction_id)
-      request_type = work_request.WhichOneof('request')
-      # Name spacing the request method with 'request_'. The called method
-      # will be like self.request_register(request)
-      getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
-          work_request)
+    self._alive = True
+    monitoring_thread = threading.Thread(target=self._monitor_process_bundle)
+    monitoring_thread.daemon = True
+    monitoring_thread.start()
+
+    try:
+      for work_request in control_stub.Control(get_responses()):
+        logging.debug('Got work %s', work_request.instruction_id)
+        request_type = work_request.WhichOneof('request')
+        # Name spacing the request method with 'request_'. The called method
+        # will be like self.request_register(request)
+        getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
+            work_request)
+    finally:
+      self._alive = False
 
     logging.info('No more requests from control plane')
     logging.info('SDK Harness waiting for in-flight requests to complete')
-    self._alive = False
     # Wait until existing requests are processed.
     self._progress_thread_pool.shutdown()
     self._process_thread_pool.shutdown()
-    self._monitoring_thread_pool.shutdown(wait=False)
     # get_responses may be blocked on responses.get(), but we need to return
     # control to its caller.
     self._responses.put(no_more_work)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 176775)
    Time Spent: 1h 40m  (was: 1.5h)

> Use concurrency information from SDK Harness in Flink Portable Runner
> ---------------------------------------------------------------------
>
>                 Key: BEAM-5167
>                 URL: https://issues.apache.org/jira/browse/BEAM-5167
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Based on the discussion 
> [https://lists.apache.org/thread.html/0cbf73d696e0d3a5bb8e93618ac9d6bb81daecf2c9c8e11ee220c8ae@%3Cdev.beam.apache.org%3E]
> Use SDK Harness concurrency information in Flink runner to schedule bundles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to