This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch release-2.69
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.69 by this push:
     new b48d832da1f [release-2.69] Cherrypick #36518 to the branch. (#36551)
b48d832da1f is described below

commit b48d832da1f4a1d69f5de73620a40bcb9f816955
Author: tvalentyn <[email protected]>
AuthorDate: Mon Oct 20 09:18:50 2025 -0700

    [release-2.69] Cherrypick #36518 to the branch. (#36551)
    
    * Track bundle processors that are pending creation and terminate SDK if 
creating a BP exceeds a timeout.
    
    * Rename the term
    
    * Remove unnecessary conditions.
    
    * add tests
    
    * Address comments
    
    * Also add a test for logging a lull in process.
---
 .../apache_beam/runners/worker/sdk_worker.py       |   6 +-
 .../apache_beam/runners/worker/worker_status.py    | 102 ++++++++++++++++-----
 .../runners/worker/worker_status_test.py           |  96 ++++++++++++++++++-
 3 files changed, 175 insertions(+), 29 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 0b4c236d6b3..c520740038e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -454,6 +454,8 @@ class BundleProcessorCache(object):
     )  # type: collections.OrderedDict[str, Exception]
     self.active_bundle_processors = {
     }  # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
+    self.processors_being_created = {
+    }  # type: Dict[str, Tuple[str, threading.Thread, float]]
     self.cached_bundle_processors = collections.defaultdict(
         list)  # type: DefaultDict[str, List[bundle_processor.BundleProcessor]]
     self.last_access_times = collections.defaultdict(
@@ -501,7 +503,8 @@ class BundleProcessorCache(object):
           pass
         return processor
       except IndexError:
-        pass
+        self.processors_being_created[instruction_id] = (
+            bundle_descriptor_id, threading.current_thread(), time.time())
 
     # Make sure we instantiate the processor while not holding the lock.
 
@@ -521,6 +524,7 @@ class BundleProcessorCache(object):
     with self._lock:
       self.active_bundle_processors[
         instruction_id] = bundle_descriptor_id, processor
+      del self.processors_being_created[instruction_id]
       try:
         del self.known_not_running_instruction_ids[instruction_id]
       except KeyError:
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py 
b/sdks/python/apache_beam/runners/worker/worker_status.py
index 86a7b5e8ee1..f4102b19389 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -119,20 +119,21 @@ def _state_cache_stats(state_cache: StateCache) -> str:
   return '\n'.join(cache_stats)
 
 
-def _active_processing_bundles_state(bundle_process_cache):
+def _active_processing_bundles_state(bundle_processor_cache):
   """Gather information about the currently in-processing active bundles.
 
   The result only keeps the longest lasting 10 bundles to avoid excessive
   spamming.
   """
   active_bundles = ['=' * 10 + ' ACTIVE PROCESSING BUNDLES ' + '=' * 10]
-  if not bundle_process_cache.active_bundle_processors:
+  if (not bundle_processor_cache.active_bundle_processors and
+      not bundle_processor_cache.processors_being_created):
     active_bundles.append("No active processing bundles.")
   else:
     cache = []
     for instruction in list(
-        bundle_process_cache.active_bundle_processors.keys()):
-      processor = bundle_process_cache.lookup(instruction)
+        bundle_processor_cache.active_bundle_processors.keys()):
+      processor = bundle_processor_cache.lookup(instruction)
       if processor:
         info = processor.state_sampler.get_info()
         cache.append((
@@ -149,6 +150,18 @@ def _active_processing_bundles_state(bundle_process_cache):
       state += "time since transition: %.2f seconds\n" % (s[3] / 1e9)
       active_bundles.append(state)
 
+    if bundle_processor_cache.processors_being_created:
+      active_bundles.append("Processors being created:\n")
+      current_time = time.time()
+      for instruction, (bundle_id, thread, creation_time) in (
+          bundle_processor_cache.processors_being_created.items()):
+        state = '--- instruction %s ---\n' % instruction
+        state += 'ProcessBundleDescriptorId: %s\n' % bundle_id
+        state += "tracked thread: %s\n" % thread
+        state += "time since creation started: %.2f seconds\n" % (
+            current_time - creation_time)
+        active_bundles.append(state)
+
   active_bundles.append('=' * 30)
   return '\n'.join(active_bundles)
 
@@ -161,7 +174,7 @@ class FnApiWorkerStatusHandler(object):
   def __init__(
       self,
       status_address,
-      bundle_process_cache=None,
+      bundle_processor_cache=None,
       state_cache=None,
       enable_heap_dump=False,
       worker_id=None,
@@ -171,11 +184,11 @@ class FnApiWorkerStatusHandler(object):
 
     Args:
       status_address: The URL Runner uses to host the WorkerStatus server.
-      bundle_process_cache: The BundleProcessor cache dict from sdk worker.
+      bundle_processor_cache: The BundleProcessor cache dict from sdk worker.
       state_cache: The StateCache form sdk worker.
     """
     self._alive = True
-    self._bundle_process_cache = bundle_process_cache
+    self._bundle_processor_cache = bundle_processor_cache
     self._state_cache = state_cache
     ch = GRPCChannelFactory.insecure_channel(status_address)
     grpc.channel_ready_future(ch).result(timeout=60)
@@ -200,7 +213,7 @@ class FnApiWorkerStatusHandler(object):
     self._server.start()
     self._lull_logger = threading.Thread(
         target=lambda: self._log_lull_in_bundle_processor(
-            self._bundle_process_cache),
+            self._bundle_processor_cache),
         name='lull_operation_logger')
     self._lull_logger.daemon = True
     self._lull_logger.start()
@@ -234,9 +247,9 @@ class FnApiWorkerStatusHandler(object):
     if self._state_cache:
       all_status_sections.append(_state_cache_stats(self._state_cache))
 
-    if self._bundle_process_cache:
+    if self._bundle_processor_cache:
       all_status_sections.append(
-          _active_processing_bundles_state(self._bundle_process_cache))
+          _active_processing_bundles_state(self._bundle_processor_cache))
 
     all_status_sections.append(thread_dump())
     if self._enable_heap_dump:
@@ -247,24 +260,64 @@ class FnApiWorkerStatusHandler(object):
   def close(self):
     self._responses.put(DONE, timeout=5)
 
-  def _log_lull_in_bundle_processor(self, bundle_process_cache):
+  def _log_lull_in_bundle_processor(self, bundle_processor_cache):
     while True:
       time.sleep(2 * 60)
-      if bundle_process_cache and 
bundle_process_cache.active_bundle_processors:
-        for instruction in list(
-            bundle_process_cache.active_bundle_processors.keys()):
-          processor = bundle_process_cache.lookup(instruction)
-          if processor:
-            info = processor.state_sampler.get_info()
-            self._log_lull_sampler_info(info, instruction)
+      if not bundle_processor_cache:
+        continue
+
+      for instruction in list(
+          bundle_processor_cache.active_bundle_processors.keys()):
+        processor = bundle_processor_cache.lookup(instruction)
+        if processor:
+          info = processor.state_sampler.get_info()
+          self._log_lull_sampler_info(info, instruction)
+
+      for instruction, (bundle_id, thread, creation_time) in list(
+          bundle_processor_cache.processors_being_created.items()):
+        self._log_lull_in_creating_bundle_descriptor(
+            instruction, bundle_id, thread, creation_time)
+
+  def _log_lull_in_creating_bundle_descriptor(
+      self, instruction, bundle_id, thread, creation_time):
+    time_since_creation_ns = (time.time() - creation_time) * 1e9
+
+    if (self._element_processing_timeout_ns and
+        time_since_creation_ns > self._element_processing_timeout_ns):
+      stack_trace = self._get_stack_trace(thread)
+      _LOGGER.error((
+          'Creation of bundle processor for instruction %s (bundle %s) '
+          'has exceeded the specified timeout of %.2f minutes. '
+          'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
+          'SDK harness will be terminated.\n'
+          'Current Traceback:\n%s'),
+                    instruction,
+                    bundle_id,
+                    self._element_processing_timeout_ns / 1e9 / 60,
+                    stack_trace)
+      from apache_beam.runners.worker.sdk_worker_main import 
terminate_sdk_harness
+      terminate_sdk_harness()
+
+    if (time_since_creation_ns > self.log_lull_timeout_ns and
+        self._passed_lull_timeout_since_last_log()):
+      stack_trace = self._get_stack_trace(thread)
+      _LOGGER.warning((
+          'Bundle processor for instruction %s (bundle %s) '
+          'has been creating for at least %.2f seconds.\n'
+          'This might indicate slowness in DoFn.setup() or in DoFn creation. '
+          'Current Traceback:\n%s'),
+                      instruction,
+                      bundle_id,
+                      time_since_creation_ns / 1e9,
+                      stack_trace)
 
   def _log_lull_sampler_info(self, sampler_info, instruction):
     if (not sampler_info or not sampler_info.time_since_transition):
       return
 
     log_lull = (
-        self._passed_lull_timeout_since_last_log() and
-        sampler_info.time_since_transition > self.log_lull_timeout_ns)
+        sampler_info.time_since_transition > self.log_lull_timeout_ns and
+        self._passed_lull_timeout_since_last_log())
     timeout_exceeded = (
         self._element_processing_timeout_ns and
         sampler_info.time_since_transition
@@ -281,7 +334,7 @@ class FnApiWorkerStatusHandler(object):
           ' for PTransform{name=%s, state=%s}' % (step_name, state_name))
     else:
       step_name_log = ''
-    stack_trace = self._get_stack_trace(sampler_info)
+    stack_trace = self._get_stack_trace(sampler_info.tracked_thread)
 
     if timeout_exceeded:
       _LOGGER.error(
@@ -310,10 +363,9 @@ class FnApiWorkerStatusHandler(object):
           stack_trace,
       )
 
-  def _get_stack_trace(self, sampler_info):
-    exec_thread = getattr(sampler_info, 'tracked_thread', None)
-    if exec_thread is not None:
-      thread_frame = _current_frames().get(exec_thread.ident)
+  def _get_stack_trace(self, thread):
+    if thread:
+      thread_frame = _current_frames().get(thread.ident)
       return '\n'.join(
           traceback.format_stack(thread_frame)) if thread_frame else ''
     else:
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py 
b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 67df1a324d9..88543258250 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -45,7 +45,7 @@ class 
BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
       self.finished.acquire()
       self.response_received.append(response)
       if len(self.response_received) == self.num_request:
-        self.finished.notifyAll()
+        self.finished.notify_all()
       self.finished.release()
 
 
@@ -63,6 +63,7 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
         self.url, element_processing_timeout_minutes=10)
 
   def tearDown(self):
+    self.fn_status_handler.close()
     self.server.stop(5)
 
   def test_send_status_response(self):
@@ -72,7 +73,6 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
     self.test_status_service.finished.release()
     for response in self.test_status_service.response_received:
       self.assertIsNotNone(response.status_info)
-    self.fn_status_handler.close()
 
   @mock.patch(
       'apache_beam.runners.worker.worker_status'
@@ -85,7 +85,6 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
     self.test_status_service.finished.release()
     for response in self.test_status_service.response_received:
       self.assertIsNotNone(response.error)
-    self.fn_status_handler.close()
 
   def test_log_lull_in_bundle_processor(self):
     def get_state_sampler_info_for_lull(lull_duration_s):
@@ -133,6 +132,97 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
           self.fn_status_handler._log_lull_sampler_info(sampler_info, 
bundle_id)
           self.assertEqual(flush_mock.call_count, 3)
 
+  def test_lull_logs_emitted_when_creating_bundle_processor_takes_time(self):
+    instruction_id = "instruction-1"
+    bundle_id = "bundle-1"
+    thread = threading.current_thread()
+    now = time.time()
+    creation_time = now
+
+    with (
+        mock.patch('logging.Logger.warning') as warn_mock,
+        mock.patch('logging.Logger.error') as error_mock,
+        mock.patch('time.time') as time_mock,
+        mock.patch(
+            'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
+        ) as terminate_mock):
+      # Set time to be past the lull timeout
+      time_mock.return_value = (
+          now + self.fn_status_handler.log_lull_timeout_ns / 1e9 + 1)
+      self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
+          instruction_id, bundle_id, thread, creation_time)
+      warn_mock.assert_called_once()
+      args, _ = warn_mock.call_args
+      self.assertIn(
+          'Bundle processor for instruction %s (bundle %s) has been '
+          'creating for at least %.2f seconds',
+          args[0])
+
+      # Set time to be past the element processing timeout
+      time_mock.return_value = (
+          now + self.fn_status_handler._element_processing_timeout_ns / 1e9 + 
1)
+
+      self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
+          instruction_id, bundle_id, thread, creation_time)
+
+      error_mock.assert_called_once()
+      args, _ = error_mock.call_args
+      self.assertIn(
+          'Creation of bundle processor for instruction %s (bundle %s) '
+          'has exceeded the specified timeout',
+          args[0])
+
+      terminate_mock.assert_called_once()
+
+  def test_lull_logs_emitted_when_processing_a_bundle_takes_time(self):
+    instruction_id = "instruction-1"
+    now = time.time()
+    thread = threading.current_thread()
+
+    with (
+        mock.patch('logging.Logger.warning') as warn_mock,
+        mock.patch('logging.Logger.error') as error_mock,
+        mock.patch('time.time') as time_mock,
+        mock.patch(
+            'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
+        ) as terminate_mock):
+      time_mock.return_value = now + 1
+      # Set time to be past the lull timeout
+      sampler_info = statesampler.StateSamplerInfo(
+          state_name=CounterName('test_counter', 'test_stage', 'test_step'),
+          transition_count=1,
+          # Set time to be past the lull timeout
+          time_since_transition=(
+              self.fn_status_handler.log_lull_timeout_ns + 1),
+          tracked_thread=thread)
+      self.fn_status_handler._log_lull_sampler_info(
+          sampler_info, instruction_id)
+      warn_mock.assert_called_once()
+      args, _ = warn_mock.call_args
+      self.assertIn(
+          'Operation ongoing in bundle %s%s for at least %.2f seconds', 
args[0])
+
+      time_mock.return_value = now + 2
+
+      sampler_info = statesampler.StateSamplerInfo(
+          state_name=CounterName('test_counter', 'test_stage', 'test_step'),
+          transition_count=1,
+          # Set time to be past the element processing timeout
+          time_since_transition=(
+              self.fn_status_handler._element_processing_timeout_ns + 1),
+          tracked_thread=thread)
+      self.fn_status_handler._log_lull_sampler_info(
+          sampler_info, instruction_id)
+
+      error_mock.assert_called_once()
+      args, _ = error_mock.call_args
+      self.assertIn(
+          'Processing of an element in bundle %s%s has exceeded the '
+          'specified timeout of %.2f minutes',
+          args[0])
+
+      terminate_mock.assert_called_once()
+
 
 class HeapDumpTest(unittest.TestCase):
   @mock.patch('apache_beam.runners.worker.worker_status.hpy', None)

Reply via email to