kacpermuda commented on code in PR #68708:
URL: https://github.com/apache/airflow/pull/68708#discussion_r3435069697


##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
 
     def _execute(self, callable, callable_name: str, use_fork: bool = False):
         if use_fork:
-            self._fork_execute(callable, callable_name)
+            if conf.execute_in_thread():
+                self._thread_execute(callable, callable_name)
+            else:
+                self._fork_execute(callable, callable_name)
         else:
             callable()
 
+    def _thread_execute(self, callable, callable_name: str):
+        """
+        Run OpenLineage event emission in a time-bounded daemon thread.
+
+        Opt-in alternative to :meth:`_fork_execute`, enabled via
+        ``[openlineage] execute_in_thread``. Unlike forking, this never 
duplicates the
+        task runner process, so the supervisor connection (and every other 
inherited
+        resource) is left untouched -- a blocked emission can therefore never 
leave the
+        task stuck in the ``running`` state. Metadata extraction still runs 
in-process
+        with full access to the task runtime, so Operators whose extractors 
resolve
+        Connections, Variables or XComs keep working.
+        """
+        thread = threading.Thread(
+            target=callable,
+            name=f"openlineage-{callable_name}",
+            daemon=True,
+        )
+        thread.start()
+        thread.join(timeout=conf.execution_timeout())
+        if thread.is_alive():
+            # Emission is still running. We deliberately do not keep waiting: 
the thread is

Review Comment:
   **COR.N2** [nit] Correctness — Abandoned thread reads live shared process 
state instead of a fork-time snapshot
   
   `_fork_execute` runs the callable against a copy-on-write snapshot of the 
process at fork time, so post-fork mutations in the parent cannot affect the 
in-flight event. The thread path shares the same process memory: after 
`_thread_execute` returns on timeout and the task runner proceeds 
(finalizing/closing the supervisor connection or ORM session), the 
still-running emission thread continues reading the same live objects/sessions. 
This can surface as partially-built or corrupted events, or exceptions when a 
session/socket the thread is using is torn down underneath it (caught by 
`@print_warning`, so no crash, but emission silently lost). Risk is low because 
emission closures mostly read immutable locals, but the comment "holding only 
its own backend connection" here understates this fork-vs-thread semantic 
difference.



##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
 
     def _execute(self, callable, callable_name: str, use_fork: bool = False):
         if use_fork:
-            self._fork_execute(callable, callable_name)
+            if conf.execute_in_thread():
+                self._thread_execute(callable, callable_name)
+            else:
+                self._fork_execute(callable, callable_name)
         else:
             callable()
 
+    def _thread_execute(self, callable, callable_name: str):
+        """
+        Run OpenLineage event emission in a time-bounded daemon thread.
+
+        Opt-in alternative to :meth:`_fork_execute`, enabled via
+        ``[openlineage] execute_in_thread``. Unlike forking, this never 
duplicates the
+        task runner process, so the supervisor connection (and every other 
inherited
+        resource) is left untouched -- a blocked emission can therefore never 
leave the
+        task stuck in the ``running`` state. Metadata extraction still runs 
in-process
+        with full access to the task runtime, so Operators whose extractors 
resolve
+        Connections, Variables or XComs keep working.
+        """
+        thread = threading.Thread(
+            target=callable,
+            name=f"openlineage-{callable_name}",
+            daemon=True,
+        )
+        thread.start()
+        thread.join(timeout=conf.execution_timeout())

Review Comment:
   **COR.N1** [nit] Correctness — Abandoned emission thread can still block the 
task runner via the shared `SUPERVISOR_COMMS` lock
   
   The thread path's stated guarantee is that emission can never block the task 
runner past `execution_timeout`. However, the emission callable runs 
`extract_metadata`, which on Airflow 3 resolves Connections/Variables/XComs 
through the single shared `SUPERVISOR_COMMS` channel whose `_thread_lock` spans 
the entire request/response round trip (`task_sdk/.../comms.py send()`). If the 
abandoned daemon thread is mid-`send()` when the main task thread next needs 
the supervisor channel, the main thread blocks on that lock until the abandoned 
thread's round-trip completes — reintroducing, in thread form, a variant of the 
supervisor-channel stall the fork path was meant to avoid. Unlike fork (which 
is terminated via `_terminate_with_wait`), the thread is neither isolated from 
nor killable on this shared resource. Low likelihood in the common slow-backend 
case (the hang is usually in adapter HTTP emit, not under the supervisor lock), 
so a caveat rather than a break, but it qualifies the doc
 umented "can never block" claim.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to