This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d1b1f74ab7c fix(openlineage): self-heal ProcessPoolExecutor on 
BrokenProcessPool (#67400)
d1b1f74ab7c is described below

commit d1b1f74ab7c5f03b87e397d469dda797d59fac0b
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun May 31 03:36:28 2026 +0530

    fix(openlineage): self-heal ProcessPoolExecutor on BrokenProcessPool 
(#67400)
    
    * fix(openlineage): self-heal ProcessPoolExecutor on BrokenProcessPool
    
    When a child process in the OpenLineage listener's ProcessPoolExecutor
    terminates abruptly, concurrent.futures marks the pool as permanently
    broken. Every subsequent submission raises BrokenProcessPool and lineage
    data stops flowing until the scheduler is restarted.
    
    This adds self-healing: submit_callable now catches BrokenProcessPool,
    shuts down the broken executor, creates a fresh one, and retries the
    submission so lineage reporting recovers automatically.
    
    Closes #67283
    
    * fix(openlineage): sort imports to fix ruff I001 static check
    
    ruff I001: `from concurrent.futures.process import BrokenProcessPool`
    must follow `from concurrent.futures import ProcessPoolExecutor`
    
    * fix(openlineage): collapse log.warning to single line for ruff-format
    
    The warning message fits within the line-length limit so it should not be
    split across three lines.
    
    ---------
    
    Co-authored-by: Anmol Mishra <[email protected]>
---
 .../providers/openlineage/plugins/listener.py      |  9 +++++-
 .../unit/openlineage/plugins/test_listener.py      | 32 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 0f4b0f3ca23..2d4d74e828f 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -20,6 +20,7 @@ import logging
 import os
 import sys
 from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures.process import BrokenProcessPool
 from datetime import datetime
 from functools import cache
 from typing import TYPE_CHECKING
@@ -1049,7 +1050,13 @@ class OpenLineageListener:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_failed", exc_info=e)
 
     def submit_callable(self, callable, *args, **kwargs):
-        fut = self.executor.submit(callable, *args, **kwargs)
+        try:
+            fut = self.executor.submit(callable, *args, **kwargs)
+        except BrokenProcessPool:
+            self.log.warning("ProcessPoolExecutor is broken; recreating and 
retrying submission.")
+            self._executor.shutdown(wait=False)
+            self._executor = None
+            fut = self.executor.submit(callable, *args, **kwargs)
         fut.add_done_callback(self.log_submit_error)
         return fut
 
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 25bee70ce75..edec29202c0 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -2128,6 +2128,38 @@ class TestOpenLineageListenerAirflow3:
         listener.log.debug.assert_not_called()
         listener.log.warning.assert_called_once()
 
+    def test_submit_callable_recreates_executor_on_broken_pool(self):
+        """When a child process dies and BrokenProcessPool is raised, the
+        listener should shut down the broken executor, create a fresh one, and
+        retry the submission."""
+        from concurrent.futures.process import BrokenProcessPool
+
+        listener = OpenLineageListener()
+        broken_executor = MagicMock()
+        broken_executor.submit.side_effect = BrokenProcessPool()
+        new_executor = MagicMock()
+        new_future = MagicMock()
+        new_executor.submit.return_value = new_future
+
+        listener._executor = broken_executor
+        listener.log = MagicMock()
+
+        def dummy_callable():
+            pass
+
+        with mock.patch(
+            
"airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor",
+            return_value=new_executor,
+        ):
+            fut = listener.submit_callable(dummy_callable, "arg1", 
kwarg1="val1")
+
+        broken_executor.shutdown.assert_called_once_with(wait=False)
+        new_executor.submit.assert_called_once_with(dummy_callable, "arg1", 
kwarg1="val1")
+        
new_future.add_done_callback.assert_called_once_with(listener.log_submit_error)
+        assert fut is new_future
+        listener.log.warning.assert_called_once()
+        assert "recreating" in listener.log.warning.call_args[0][0]
+
 
 @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2 tests")
 class TestOpenLineageSelectiveEnableAirflow2:

Reply via email to