kacpermuda commented on code in PR #68708:
URL: https://github.com/apache/airflow/pull/68708#discussion_r3435137674
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
def _execute(self, callable, callable_name: str, use_fork: bool = False):
if use_fork:
- self._fork_execute(callable, callable_name)
+ if conf.execute_in_thread():
+ self._thread_execute(callable, callable_name)
+ else:
+ self._fork_execute(callable, callable_name)
else:
callable()
+ def _thread_execute(self, callable, callable_name: str):
+ """
+ Run OpenLineage event emission in a time-bounded daemon thread.
+
+ Opt-in alternative to :meth:`_fork_execute`, enabled via
+ ``[openlineage] execute_in_thread``. Unlike forking, this never
duplicates the
+ task runner process, so the supervisor connection (and every other
inherited
+ resource) is left untouched -- a blocked emission can therefore never
leave the
+ task stuck in the ``running`` state. Metadata extraction still runs
in-process
+ with full access to the task runtime, so Operators whose extractors
resolve
+ Connections, Variables or XComs keep working.
+ """
+ thread = threading.Thread(
+ target=callable,
+ name=f"openlineage-{callable_name}",
+ daemon=True,
+ )
+ thread.start()
+ thread.join(timeout=conf.execution_timeout())
Review Comment:
Follow-up context from digging into the fork failure history: the
`_thread_lock` in `CommsDecoder.send()` is a **threading** lock (shared across
threads in the same process), so it actually *prevents* concurrent writes from
the main and emission threads — the framing protocol stays intact. This is
qualitatively different from fork's failure mode, where two processes held the
same socket fd without cross-process synchronisation and could interleave
bytes, producing garbled frames and permanently desynchronising the protocol
(the root cause behind #65714, #66573, #66574 etc. on 3.1.x). The thread
approach's risk here is bounded and non-corrupting: the main thread may wait on
the lock, but it cannot corrupt the frame sequence. The "can never block"
overclaim in the docstring still stands as a nit, but this failure mode is
considerably lighter than what it replaces.
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
def _execute(self, callable, callable_name: str, use_fork: bool = False):
if use_fork:
- self._fork_execute(callable, callable_name)
+ if conf.execute_in_thread():
+ self._thread_execute(callable, callable_name)
+ else:
+ self._fork_execute(callable, callable_name)
else:
callable()
+ def _thread_execute(self, callable, callable_name: str):
+ """
+ Run OpenLineage event emission in a time-bounded daemon thread.
+
+ Opt-in alternative to :meth:`_fork_execute`, enabled via
+ ``[openlineage] execute_in_thread``. Unlike forking, this never
duplicates the
+ task runner process, so the supervisor connection (and every other
inherited
+ resource) is left untouched -- a blocked emission can therefore never
leave the
+ task stuck in the ``running`` state. Metadata extraction still runs
in-process
+ with full access to the task runtime, so Operators whose extractors
resolve
+ Connections, Variables or XComs keep working.
+ """
+ thread = threading.Thread(
+ target=callable,
+ name=f"openlineage-{callable_name}",
+ daemon=True,
+ )
+ thread.start()
+ thread.join(timeout=conf.execution_timeout())
+ if thread.is_alive():
+ # Emission is still running. We deliberately do not keep waiting:
the thread is
Review Comment:
Follow-up context: this finding is independent of the socket-sharing concern
that motivated the switch from fork. Fork's COW snapshot also provided memory
isolation — which the thread approach gives up — but the failure mode here is
much lighter: a stale or partially-built event, or an exception caught by
`@print_warning` (emission silently lost, no crash). No protocol corruption, no
stuck TIs. The nit severity stands, but worth noting this is a much softer
failure than the socket-interleaving problem the PR is fixing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]