This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 29f456a85f Bugfix for get_parsing_context() when ran with
LocalExecutor (#40738)
29f456a85f is described below
commit 29f456a85f217b64862c3ecb94618a8bb83a571f
Author: ErandM50 <[email protected]>
AuthorDate: Tue Jul 23 19:40:06 2024 -0400
Bugfix for get_parsing_context() when ran with LocalExecutor (#40738)
When running with local executor, get_parsing_context would return None
regardless of the parsing context. This leads to longer loading times when a
lot of dynamic DAGs are generated from a single dagfile. Changed
local_executor.py to execute the task with the _airflow_parsing_context_manager.
---
airflow/executors/local_executor.py | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index 90cedf7dbd..afa51b1d86 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -40,6 +40,7 @@ from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.traces.tracer import Trace, span
+from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -91,10 +92,12 @@ class LocalWorkerBase(Process, LoggingMixin):
self.log.info("%s running %s", self.__class__.__name__, command)
setproctitle(f"airflow worker -- LocalExecutor: {command}")
- if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
- state = self._execute_work_in_subprocess(command)
- else:
- state = self._execute_work_in_fork(command)
+ dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(command)
+ with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
+ if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
+ state = self._execute_work_in_subprocess(command)
+ else:
+ state = self._execute_work_in_fork(command)
self.result_queue.put((key, state))
# Remove the command since the worker is done executing the task