[
https://issues.apache.org/jira/browse/BEAM-5167?focusedWorklogId=176776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176776
]
ASF GitHub Bot logged work on BEAM-5167:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Dec/18 22:20
Start Date: 18/Dec/18 22:20
Worklog Time Spent: 10m
Work Description: angoenka closed pull request #7310: [BEAM-5167] Use
daemon thread instead of thread pool for monitoring
URL: https://github.com/apache/beam/pull/7310
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 00cdf856109d..fb1c8d85e40b 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -79,7 +79,6 @@ def __init__(
self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
self._process_thread_pool = futures.ThreadPoolExecutor(
max_workers=self._worker_count)
- self._monitoring_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
self._instruction_id_vs_worker = {}
self._fns = {}
self._responses = queue.Queue()
@@ -108,8 +107,6 @@ def run(self):
fns=self._fns,
profiler_factory=self._profiler_factory))
- self._monitoring_thread_pool.submit(self._monitor_process_bundle)
-
def get_responses():
while True:
response = self._responses.get()
@@ -117,21 +114,26 @@ def get_responses():
return
yield response
- for work_request in control_stub.Control(get_responses()):
- logging.debug('Got work %s', work_request.instruction_id)
- request_type = work_request.WhichOneof('request')
- # Name spacing the request method with 'request_'. The called method
- # will be like self.request_register(request)
- getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
- work_request)
+ monitoring_thread = threading.Thread(target=self._monitor_process_bundle)
+ monitoring_thread.setDaemon(True)
+ monitoring_thread.start()
+
+ try:
+ for work_request in control_stub.Control(get_responses()):
+ logging.debug('Got work %s', work_request.instruction_id)
+ request_type = work_request.WhichOneof('request')
+ # Name spacing the request method with 'request_'. The called method
+ # will be like self.request_register(request)
+ getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
+ work_request)
+ finally:
+ self._alive = False
logging.info('No more requests from control plane')
logging.info('SDK Harness waiting for in-flight requests to complete')
- self._alive = False
# Wait until existing requests are processed.
self._progress_thread_pool.shutdown()
self._process_thread_pool.shutdown()
- self._monitoring_thread_pool.shutdown(wait=False)
# get_responses may be blocked on responses.get(), but we need to return
# control to its caller.
self._responses.put(no_more_work)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 176776)
Time Spent: 1h 50m (was: 1h 40m)
> Use concurrency information from SDK Harness in Flink Portable Runner
> ---------------------------------------------------------------------
>
> Key: BEAM-5167
> URL: https://issues.apache.org/jira/browse/BEAM-5167
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Based on the discussion
> [https://lists.apache.org/thread.html/0cbf73d696e0d3a5bb8e93618ac9d6bb81daecf2c9c8e11ee220c8ae@%3Cdev.beam.apache.org%3E]
> Use SDK Harness concurrency information in Flink runner to schedule bundles.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)