This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch release-2.69
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.69 by this push:
new b48d832da1f [release-2.69] Cherrypick #36518 to the branch. (#36551)
b48d832da1f is described below
commit b48d832da1f4a1d69f5de73620a40bcb9f816955
Author: tvalentyn <[email protected]>
AuthorDate: Mon Oct 20 09:18:50 2025 -0700
[release-2.69] Cherrypick #36518 to the branch. (#36551)
* Track bundle processors that are pending creation and terminate SDK if
creating a BP exceeds a timeout.
* Rename the term
* Remove unnecessary conditions.
* add tests
* Address comments
* Also add a test for logging a lull in process.
---
.../apache_beam/runners/worker/sdk_worker.py | 6 +-
.../apache_beam/runners/worker/worker_status.py | 102 ++++++++++++++++-----
.../runners/worker/worker_status_test.py | 96 ++++++++++++++++++-
3 files changed, 175 insertions(+), 29 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 0b4c236d6b3..c520740038e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -454,6 +454,8 @@ class BundleProcessorCache(object):
) # type: collections.OrderedDict[str, Exception]
self.active_bundle_processors = {
} # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
+ self.processors_being_created = {
+ } # type: Dict[str, Tuple[str, threading.Thread, float]]
self.cached_bundle_processors = collections.defaultdict(
list) # type: DefaultDict[str, List[bundle_processor.BundleProcessor]]
self.last_access_times = collections.defaultdict(
@@ -501,7 +503,8 @@ class BundleProcessorCache(object):
pass
return processor
except IndexError:
- pass
+ self.processors_being_created[instruction_id] = (
+ bundle_descriptor_id, threading.current_thread(), time.time())
# Make sure we instantiate the processor while not holding the lock.
@@ -521,6 +524,7 @@ class BundleProcessorCache(object):
with self._lock:
self.active_bundle_processors[
instruction_id] = bundle_descriptor_id, processor
+ del self.processors_being_created[instruction_id]
try:
del self.known_not_running_instruction_ids[instruction_id]
except KeyError:
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py
b/sdks/python/apache_beam/runners/worker/worker_status.py
index 86a7b5e8ee1..f4102b19389 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -119,20 +119,21 @@ def _state_cache_stats(state_cache: StateCache) -> str:
return '\n'.join(cache_stats)
-def _active_processing_bundles_state(bundle_process_cache):
+def _active_processing_bundles_state(bundle_processor_cache):
"""Gather information about the currently in-processing active bundles.
The result only keeps the longest lasting 10 bundles to avoid excessive
spamming.
"""
active_bundles = ['=' * 10 + ' ACTIVE PROCESSING BUNDLES ' + '=' * 10]
- if not bundle_process_cache.active_bundle_processors:
+ if (not bundle_processor_cache.active_bundle_processors and
+ not bundle_processor_cache.processors_being_created):
active_bundles.append("No active processing bundles.")
else:
cache = []
for instruction in list(
- bundle_process_cache.active_bundle_processors.keys()):
- processor = bundle_process_cache.lookup(instruction)
+ bundle_processor_cache.active_bundle_processors.keys()):
+ processor = bundle_processor_cache.lookup(instruction)
if processor:
info = processor.state_sampler.get_info()
cache.append((
@@ -149,6 +150,18 @@ def _active_processing_bundles_state(bundle_process_cache):
state += "time since transition: %.2f seconds\n" % (s[3] / 1e9)
active_bundles.append(state)
+ if bundle_processor_cache.processors_being_created:
+ active_bundles.append("Processors being created:\n")
+ current_time = time.time()
+ for instruction, (bundle_id, thread, creation_time) in (
+ bundle_processor_cache.processors_being_created.items()):
+ state = '--- instruction %s ---\n' % instruction
+ state += 'ProcessBundleDescriptorId: %s\n' % bundle_id
+ state += "tracked thread: %s\n" % thread
+ state += "time since creation started: %.2f seconds\n" % (
+ current_time - creation_time)
+ active_bundles.append(state)
+
active_bundles.append('=' * 30)
return '\n'.join(active_bundles)
@@ -161,7 +174,7 @@ class FnApiWorkerStatusHandler(object):
def __init__(
self,
status_address,
- bundle_process_cache=None,
+ bundle_processor_cache=None,
state_cache=None,
enable_heap_dump=False,
worker_id=None,
@@ -171,11 +184,11 @@ class FnApiWorkerStatusHandler(object):
Args:
status_address: The URL Runner uses to host the WorkerStatus server.
- bundle_process_cache: The BundleProcessor cache dict from sdk worker.
+ bundle_processor_cache: The BundleProcessor cache dict from sdk worker.
state_cache: The StateCache form sdk worker.
"""
self._alive = True
- self._bundle_process_cache = bundle_process_cache
+ self._bundle_processor_cache = bundle_processor_cache
self._state_cache = state_cache
ch = GRPCChannelFactory.insecure_channel(status_address)
grpc.channel_ready_future(ch).result(timeout=60)
@@ -200,7 +213,7 @@ class FnApiWorkerStatusHandler(object):
self._server.start()
self._lull_logger = threading.Thread(
target=lambda: self._log_lull_in_bundle_processor(
- self._bundle_process_cache),
+ self._bundle_processor_cache),
name='lull_operation_logger')
self._lull_logger.daemon = True
self._lull_logger.start()
@@ -234,9 +247,9 @@ class FnApiWorkerStatusHandler(object):
if self._state_cache:
all_status_sections.append(_state_cache_stats(self._state_cache))
- if self._bundle_process_cache:
+ if self._bundle_processor_cache:
all_status_sections.append(
- _active_processing_bundles_state(self._bundle_process_cache))
+ _active_processing_bundles_state(self._bundle_processor_cache))
all_status_sections.append(thread_dump())
if self._enable_heap_dump:
@@ -247,24 +260,64 @@ class FnApiWorkerStatusHandler(object):
def close(self):
self._responses.put(DONE, timeout=5)
- def _log_lull_in_bundle_processor(self, bundle_process_cache):
+ def _log_lull_in_bundle_processor(self, bundle_processor_cache):
while True:
time.sleep(2 * 60)
- if bundle_process_cache and
bundle_process_cache.active_bundle_processors:
- for instruction in list(
- bundle_process_cache.active_bundle_processors.keys()):
- processor = bundle_process_cache.lookup(instruction)
- if processor:
- info = processor.state_sampler.get_info()
- self._log_lull_sampler_info(info, instruction)
+ if not bundle_processor_cache:
+ continue
+
+ for instruction in list(
+ bundle_processor_cache.active_bundle_processors.keys()):
+ processor = bundle_processor_cache.lookup(instruction)
+ if processor:
+ info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(info, instruction)
+
+ for instruction, (bundle_id, thread, creation_time) in list(
+ bundle_processor_cache.processors_being_created.items()):
+ self._log_lull_in_creating_bundle_descriptor(
+ instruction, bundle_id, thread, creation_time)
+
+ def _log_lull_in_creating_bundle_descriptor(
+ self, instruction, bundle_id, thread, creation_time):
+ time_since_creation_ns = (time.time() - creation_time) * 1e9
+
+ if (self._element_processing_timeout_ns and
+ time_since_creation_ns > self._element_processing_timeout_ns):
+ stack_trace = self._get_stack_trace(thread)
+ _LOGGER.error((
+ 'Creation of bundle processor for instruction %s (bundle %s) '
+ 'has exceeded the specified timeout of %.2f minutes. '
+ 'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
+ 'SDK harness will be terminated.\n'
+ 'Current Traceback:\n%s'),
+ instruction,
+ bundle_id,
+ self._element_processing_timeout_ns / 1e9 / 60,
+ stack_trace)
+ from apache_beam.runners.worker.sdk_worker_main import
terminate_sdk_harness
+ terminate_sdk_harness()
+
+ if (time_since_creation_ns > self.log_lull_timeout_ns and
+ self._passed_lull_timeout_since_last_log()):
+ stack_trace = self._get_stack_trace(thread)
+ _LOGGER.warning((
+ 'Bundle processor for instruction %s (bundle %s) '
+ 'has been creating for at least %.2f seconds.\n'
+ 'This might indicate slowness in DoFn.setup() or in DoFn creation. '
+ 'Current Traceback:\n%s'),
+ instruction,
+ bundle_id,
+ time_since_creation_ns / 1e9,
+ stack_trace)
def _log_lull_sampler_info(self, sampler_info, instruction):
if (not sampler_info or not sampler_info.time_since_transition):
return
log_lull = (
- self._passed_lull_timeout_since_last_log() and
- sampler_info.time_since_transition > self.log_lull_timeout_ns)
+ sampler_info.time_since_transition > self.log_lull_timeout_ns and
+ self._passed_lull_timeout_since_last_log())
timeout_exceeded = (
self._element_processing_timeout_ns and
sampler_info.time_since_transition
@@ -281,7 +334,7 @@ class FnApiWorkerStatusHandler(object):
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
else:
step_name_log = ''
- stack_trace = self._get_stack_trace(sampler_info)
+ stack_trace = self._get_stack_trace(sampler_info.tracked_thread)
if timeout_exceeded:
_LOGGER.error(
@@ -310,10 +363,9 @@ class FnApiWorkerStatusHandler(object):
stack_trace,
)
- def _get_stack_trace(self, sampler_info):
- exec_thread = getattr(sampler_info, 'tracked_thread', None)
- if exec_thread is not None:
- thread_frame = _current_frames().get(exec_thread.ident)
+ def _get_stack_trace(self, thread):
+ if thread:
+ thread_frame = _current_frames().get(thread.ident)
return '\n'.join(
traceback.format_stack(thread_frame)) if thread_frame else ''
else:
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py
b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 67df1a324d9..88543258250 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -45,7 +45,7 @@ class
BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
self.finished.acquire()
self.response_received.append(response)
if len(self.response_received) == self.num_request:
- self.finished.notifyAll()
+ self.finished.notify_all()
self.finished.release()
@@ -63,6 +63,7 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
self.url, element_processing_timeout_minutes=10)
def tearDown(self):
+ self.fn_status_handler.close()
self.server.stop(5)
def test_send_status_response(self):
@@ -72,7 +73,6 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
self.test_status_service.finished.release()
for response in self.test_status_service.response_received:
self.assertIsNotNone(response.status_info)
- self.fn_status_handler.close()
@mock.patch(
'apache_beam.runners.worker.worker_status'
@@ -85,7 +85,6 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
self.test_status_service.finished.release()
for response in self.test_status_service.response_received:
self.assertIsNotNone(response.error)
- self.fn_status_handler.close()
def test_log_lull_in_bundle_processor(self):
def get_state_sampler_info_for_lull(lull_duration_s):
@@ -133,6 +132,97 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
self.fn_status_handler._log_lull_sampler_info(sampler_info,
bundle_id)
self.assertEqual(flush_mock.call_count, 3)
+ def test_lull_logs_emitted_when_creating_bundle_processor_takes_time(self):
+ instruction_id = "instruction-1"
+ bundle_id = "bundle-1"
+ thread = threading.current_thread()
+ now = time.time()
+ creation_time = now
+
+ with (
+ mock.patch('logging.Logger.warning') as warn_mock,
+ mock.patch('logging.Logger.error') as error_mock,
+ mock.patch('time.time') as time_mock,
+ mock.patch(
+ 'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
+ ) as terminate_mock):
+ # Set time to be past the lull timeout
+ time_mock.return_value = (
+ now + self.fn_status_handler.log_lull_timeout_ns / 1e9 + 1)
+ self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
+ instruction_id, bundle_id, thread, creation_time)
+ warn_mock.assert_called_once()
+ args, _ = warn_mock.call_args
+ self.assertIn(
+ 'Bundle processor for instruction %s (bundle %s) has been '
+ 'creating for at least %.2f seconds',
+ args[0])
+
+ # Set time to be past the element processing timeout
+ time_mock.return_value = (
+ now + self.fn_status_handler._element_processing_timeout_ns / 1e9 +
1)
+
+ self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
+ instruction_id, bundle_id, thread, creation_time)
+
+ error_mock.assert_called_once()
+ args, _ = error_mock.call_args
+ self.assertIn(
+ 'Creation of bundle processor for instruction %s (bundle %s) '
+ 'has exceeded the specified timeout',
+ args[0])
+
+ terminate_mock.assert_called_once()
+
+ def test_lull_logs_emitted_when_processing_a_bundle_takes_time(self):
+ instruction_id = "instruction-1"
+ now = time.time()
+ thread = threading.current_thread()
+
+ with (
+ mock.patch('logging.Logger.warning') as warn_mock,
+ mock.patch('logging.Logger.error') as error_mock,
+ mock.patch('time.time') as time_mock,
+ mock.patch(
+ 'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
+ ) as terminate_mock):
+ time_mock.return_value = now + 1
+ # Set time to be past the lull timeout
+ sampler_info = statesampler.StateSamplerInfo(
+ state_name=CounterName('test_counter', 'test_stage', 'test_step'),
+ transition_count=1,
+ # Set time to be past the lull timeout
+ time_since_transition=(
+ self.fn_status_handler.log_lull_timeout_ns + 1),
+ tracked_thread=thread)
+ self.fn_status_handler._log_lull_sampler_info(
+ sampler_info, instruction_id)
+ warn_mock.assert_called_once()
+ args, _ = warn_mock.call_args
+ self.assertIn(
+ 'Operation ongoing in bundle %s%s for at least %.2f seconds',
args[0])
+
+ time_mock.return_value = now + 2
+
+ sampler_info = statesampler.StateSamplerInfo(
+ state_name=CounterName('test_counter', 'test_stage', 'test_step'),
+ transition_count=1,
+ # Set time to be past the element processing timeout
+ time_since_transition=(
+ self.fn_status_handler._element_processing_timeout_ns + 1),
+ tracked_thread=thread)
+ self.fn_status_handler._log_lull_sampler_info(
+ sampler_info, instruction_id)
+
+ error_mock.assert_called_once()
+ args, _ = error_mock.call_args
+ self.assertIn(
+ 'Processing of an element in bundle %s%s has exceeded the '
+ 'specified timeout of %.2f minutes',
+ args[0])
+
+ terminate_mock.assert_called_once()
+
class HeapDumpTest(unittest.TestCase):
@mock.patch('apache_beam.runners.worker.worker_status.hpy', None)