This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fbcee8d01b fix: scheduler crashing with OL provider on airflow
standalone (#40353)
fbcee8d01b is described below
commit fbcee8d01bddd100d9335404796a40247a6c6487
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Jun 21 15:46:10 2024 +0200
fix: scheduler crashing with OL provider on airflow standalone (#40353)
Signed-off-by: Kacper Muda <[email protected]>
---
airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 12ebe7c6e6..43553e8ba9 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -62,6 +62,18 @@ def _get_try_number_success(val):
return val.try_number - 1
+def _executor_initializer():
+ """
+ Initialize worker processes for the executor used for DagRun listener.
+
+ This function must be picklable, so it cannot be defined as an inner
method or local function.
+
+ Reconfigures the ORM engine to prevent issues that arise when multiple
processes interact with
+ the Airflow database.
+ """
+ settings.configure_orm()
+
+
class OpenLineageListener:
"""OpenLineage listener sends events on task instance and dag run starts,
completes and failures."""
@@ -366,16 +378,10 @@ class OpenLineageListener:
@property
def executor(self) -> ProcessPoolExecutor:
- # Executor for dag_run listener
- def initializer():
- # Re-configure the ORM engine as there are issues with multiple
processes
- # if process calls Airflow DB.
- settings.configure_orm()
-
if not self._executor:
self._executor = ProcessPoolExecutor(
max_workers=conf.dag_state_change_process_pool_size(),
- initializer=initializer,
+ initializer=_executor_initializer(),
)
return self._executor