I-am-Uchenna commented on code in PR #67901:
URL: https://github.com/apache/airflow/pull/67901#discussion_r3342839086
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -834,7 +836,17 @@ def _terminate_with_wait(self, process: psutil.Process):
def _fork_execute(self, callable, callable_name: str):
self.log.debug("Will fork to execute OpenLineage process.")
- pid = os.fork()
+ with warnings.catch_warnings():
+ # On Python 3.12+, os.fork() in a multi-threaded process emits a
+ # DeprecationWarning. The fork here is intentional and the child
+ # takes precautions (ORM reconfiguration, os._exit) so the warning
+ # is safe to suppress.
+ warnings.filterwarnings(
+ "ignore",
+ message=".*use of fork\\(\\) may lead to deadlocks in the
child",
+ category=DeprecationWarning,
+ )
Review Comment:
Thanks for the context @mobuchowski. I dug deeper into this rather than
retreating to the warning suppression.
Looking at the code, the "giant rework" you described might actually be more
contained than it seems. The two-phase pattern already exists in
`_on_task_instance_manual_state_change`: it extracts all primitives from ORM
objects in the parent process, then submits a picklable function
(`_emit_manual_state_change_event`) to the `ProcessPoolExecutor`. The task
instance event handlers (running/success/failed/skipped) just need the same
split.
The closures passed to `_fork_execute` do two things in sequence:
1. **Collect data** (extraction + facet building):
`extractor_manager.extract_metadata(...)`, `get_airflow_run_facet(...)`,
date/ID computations -- all of this needs live ORM objects
2. **Build + emit the OL event**:
`adapter.start_task()`/`complete_task()`/`fail_task()` -- at this point
everything is serializable (`OperatorLineage`, facet dicts, string IDs, ISO
timestamps)
The refactor would run step 1 in the parent process (where ORM objects are
available), then submit step 2 to the existing `ProcessPoolExecutor` (with
`forkserver` context) using only the extracted, picklable data. The adapter's
bound methods are already proven to work through the pool (`on_dag_run_running`
passes `self.adapter.dag_started` through `submit_callable`).
This is actually safer than the current fork for the Snowflake-class bugs
you mentioned. `os.fork()` in a multi-threaded process copies locked mutexes
into the child where they can never be released (the holding thread does not
exist in the child). That is the exact deadlock class the Python 3.12 warning
exists for. The `forkserver`-backed pool avoids this entirely by starting
workers from a clean, single-threaded server process.
The trade-off: extraction runs in the worker process rather than a forked
child. If an extractor hangs, we can wrap it with the existing `timeout()`
context manager (already used in `before_stopping`) and fall back to an empty
`OperatorLineage`, matching the graceful-degradation behavior of
`_on_task_instance_manual_state_change`. If an extractor corrupts memory, it
affects the worker -- but extractors are read-only metadata operations, and the
fork's "isolation" against memory corruption was never complete since
`os.fork()` shares file descriptors and socket state anyway.
I can push this refactor if you think the direction is right. The scope
would be:
- Refactor `_on_task_instance_running`, `_on_task_instance_success`,
`_on_task_instance_failed`, and `_on_task_instance_skipped` to extract metadata
+ build facets in the parent, then `submit_callable` with picklable args
- Add a module-level `_emit_task_instance_event` function (mirroring
`_emit_manual_state_change_event`)
- Remove `_fork_execute`, `_terminate_with_wait`, and the `_execute` fork
dispatch
- Keep the `forkserver` mp_context on `ProcessPoolExecutor`
One thing I'd want to verify during implementation: that `OperatorLineage`
(the extraction result) pickles cleanly across the pool boundary. It should,
since it is designed for JSON serialization via `Serde.to_json()`, but I would
add a defensive fallback to empty `OperatorLineage()` if pickling fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]