This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 29f456a85f Bugfix for get_parsing_context() when ran with 
LocalExecutor (#40738)
29f456a85f is described below

commit 29f456a85f217b64862c3ecb94618a8bb83a571f
Author: ErandM50 <[email protected]>
AuthorDate: Tue Jul 23 19:40:06 2024 -0400

    Bugfix for get_parsing_context() when ran with LocalExecutor (#40738)
    
    When running with local executor, get_parsing_context would return None 
regardless of the parsing context. This leads to longer loading times when a 
lot of dynamic DAGs are generated from a single dagfile. Changed 
local_executor.py to execute the task with the _airflow_parsing_context_manager.
---
 airflow/executors/local_executor.py | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 90cedf7dbd..afa51b1d86 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -40,6 +40,7 @@ from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import PARALLELISM, BaseExecutor
 from airflow.traces.tracer import Trace, span
+from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import TaskInstanceState
 
@@ -91,10 +92,12 @@ class LocalWorkerBase(Process, LoggingMixin):
 
         self.log.info("%s running %s", self.__class__.__name__, command)
         setproctitle(f"airflow worker -- LocalExecutor: {command}")
-        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-            state = self._execute_work_in_subprocess(command)
-        else:
-            state = self._execute_work_in_fork(command)
+        dag_id, task_id = 
BaseExecutor.validate_airflow_tasks_run_command(command)
+        with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
+            if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
+                state = self._execute_work_in_subprocess(command)
+            else:
+                state = self._execute_work_in_fork(command)
 
         self.result_queue.put((key, state))
         # Remove the command since the worker is done executing the task

Reply via email to