[
https://issues.apache.org/jira/browse/BEAM-5167?focusedWorklogId=172954&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172954
]
ASF GitHub Bot logged work on BEAM-5167:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 11:40
Start Date: 07/Dec/18 11:40
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #7192: [BEAM-5167] Log
unscheduled process bundle requests
URL: https://github.com/apache/beam/pull/7192
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 3c7f3e721802..c3f2b91693e2 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -27,6 +27,7 @@
import queue
import sys
import threading
+import time
import traceback
from builtins import object
from builtins import range
@@ -45,9 +46,11 @@
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
+ SCHEDULING_DELAY_THRESHOLD_SEC = 5*60 # 5 Minutes
def __init__(self, control_address, worker_count, credentials=None,
profiler_factory=None):
+ self._alive = True
self._worker_count = worker_count
self._worker_index = 0
if credentials is None:
@@ -74,11 +77,12 @@ def __init__(self, control_address, worker_count,
credentials=None,
self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
self._process_thread_pool = futures.ThreadPoolExecutor(
max_workers=self._worker_count)
+ self._monitoring_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
self._instruction_id_vs_worker = {}
self._fns = {}
self._responses = queue.Queue()
self._process_bundle_queue = queue.Queue()
- self._unscheduled_process_bundle = set()
+ self._unscheduled_process_bundle = {}
logging.info('Initializing SDKHarness with %s workers.',
self._worker_count)
def run(self):
@@ -102,6 +106,8 @@ def run(self):
fns=self._fns,
profiler_factory=self._profiler_factory))
+ self._monitoring_thread_pool.submit(self._monitor_process_bundle)
+
def get_responses():
while True:
response = self._responses.get()
@@ -119,9 +125,11 @@ def get_responses():
logging.info('No more requests from control plane')
logging.info('SDK Harness waiting for in-flight requests to complete')
+ self._alive = False
# Wait until existing requests are processed.
self._progress_thread_pool.shutdown()
self._process_thread_pool.shutdown()
+ self._monitoring_thread_pool.shutdown(wait=False)
# get_responses may be blocked on responses.get(), but we need to return
# control to its caller.
self._responses.put(no_more_work)
@@ -165,7 +173,7 @@ def task():
work = self._process_bundle_queue.get()
# add the instuction_id vs worker map for progress reporting lookup
self._instruction_id_vs_worker[work.instruction_id] = worker
- self._unscheduled_process_bundle.discard(work.instruction_id)
+ self._unscheduled_process_bundle.pop(work.instruction_id, None)
try:
self._execute(lambda: worker.do_instruction(work), work)
finally:
@@ -176,7 +184,7 @@ def task():
# Create a task for each process_bundle request and schedule it
self._process_bundle_queue.put(request)
- self._unscheduled_process_bundle.add(request.instruction_id)
+ self._unscheduled_process_bundle[request.instruction_id] = time.time()
self._process_thread_pool.submit(task)
logging.debug(
"Currently using %s threads." %
len(self._process_thread_pool._threads))
@@ -201,6 +209,28 @@ def task():
self._progress_thread_pool.submit(task)
+ def _monitor_process_bundle(self):
+ """
+ Monitor the unscheduled bundles and log if a bundle is not scheduled for
+ more than SCHEDULING_DELAY_THRESHOLD_SEC.
+ """
+ while self._alive:
+ time.sleep(SdkHarness.SCHEDULING_DELAY_THRESHOLD_SEC)
+ # Check for bundles to be scheduled.
+ if self._unscheduled_process_bundle:
+ current_time = time.time()
+ for instruction_id in self._unscheduled_process_bundle:
+ request_time = None
+ try:
+ request_time = self._unscheduled_process_bundle[instruction_id]
+ except KeyError:
+ pass
+ if request_time:
+ scheduling_delay = current_time - request_time
+ if scheduling_delay > SdkHarness.SCHEDULING_DELAY_THRESHOLD_SEC:
+ logging.warn('Unable to schedule instruction %s for %s',
+ instruction_id, scheduling_delay)
+
class SdkWorker(object):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 172954)
Time Spent: 50m (was: 40m)
> Use concurrency information from SDK Harness in Flink Portable Runner
> ---------------------------------------------------------------------
>
> Key: BEAM-5167
> URL: https://issues.apache.org/jira/browse/BEAM-5167
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Based on the discussion
> [https://lists.apache.org/thread.html/0cbf73d696e0d3a5bb8e93618ac9d6bb81daecf2c9c8e11ee220c8ae@%3Cdev.beam.apache.org%3E]
> Use SDK Harness concurrency information in Flink runner to schedule bundles.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)