dabla commented on code in PR #63489:
URL: https://github.com/apache/airflow/pull/63489#discussion_r2946005385
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +286,57 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
if trigger.callback:
trigger.callback.handle_event(event, session)
+ @classmethod
+ @provide_session
+ def return_to_worker(cls, task_instance: TaskInstance, session: Session)
-> None:
+ """
+ Return a task instance to the worker for execution.
+
+ This is used when a trigger fires an event and we need to resume a
deferred task instance.
+ It is optimized for configured executor types to directly enqueue to a
worker without going
+ through the scheduler, but can also be used as a general utility to
return a task instance
+ to the worker for execution.
+ """
+ from airflow.executors import workloads
+ from airflow.executors.executor_loader import ExecutorLoader
+
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+ task_instance.scheduled_dttm = timezone.utcnow()
+
+ # Check which executors are configured for direct queueing
+ direct_queueing_allowlist = conf.getlist("triggerer",
"direct_queueing_executors", fallback=[])
Review Comment:
Same remark here, I would expect the config not to change during runtime.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3130,40 +3130,14 @@ def _try_to_load_executor(
:param team_name: Optional pre-resolved team name. If NOTSET and
multi-team is enabled,
will query the database to resolve team name. None
indicates global team.
"""
- executor = None
if conf.getboolean("core", "multi_team"):
Review Comment:
This was not changed in this PR, but I'm wondering why this wasn't defined
as a constant at top of the module? As now this is each time retrieved from the
configuration (and thus lookups), while I would expect this not to change
during runtime?
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +286,57 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
if trigger.callback:
trigger.callback.handle_event(event, session)
+ @classmethod
+ @provide_session
+ def return_to_worker(cls, task_instance: TaskInstance, session: Session)
-> None:
Review Comment:
Nit: maybe init session by default with NEW_SESSION?
`def return_to_worker(cls, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:`
##########
airflow-core/src/airflow/executors/executor_loader.py:
##########
@@ -311,6 +311,45 @@ def init_executors(cls) -> list[BaseExecutor]:
return loaded_executors
+ @classmethod
+ def find_executor(
+ cls,
+ executors: list[BaseExecutor],
+ executor_name: str | None,
+ team_name: str | None,
+ ) -> BaseExecutor | None:
+ """
+ Find the executor matching the given name and team from a list of
executor instances.
+
+ :param executors: List of available executor instances to search
through.
+ :param executor_name: The executor name (alias, module path, or class
name) to find.
+ Pass None to get the default executor for the
given team.
+ :param team_name: The team to find the executor for (None for global
executors).
+ :return: The matching executor instance, or None if not found.
+ """
+ if executor_name is None:
Review Comment:
This seems to be duplicate:
```
if executor_name is None:
if not team_name:
# No team specified, return the global default (first in
list)
return executors[0] if executors else None
# Find the default executor for the given team
for _executor in executors:
if _executor.team_name == team_name:
return _executor
# No team-specific executor found, fall back to global default
return executors[0] if executors else None
```
I would write it like this:
```
if executor_name is None:
if team_name:
# Find the default executor for the given team
for _executor in executors:
if _executor.team_name == team_name:
return _executor
# No team-specific executor found, fall back to global default
return executors[0] if executors else None
```
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +286,57 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
if trigger.callback:
trigger.callback.handle_event(event, session)
+ @classmethod
+ @provide_session
+ def return_to_worker(cls, task_instance: TaskInstance, session: Session)
-> None:
+ """
+ Return a task instance to the worker for execution.
+
+ This is used when a trigger fires an event and we need to resume a
deferred task instance.
+ It is optimized for configured executor types to directly enqueue to a
worker without going
+ through the scheduler, but can also be used as a general utility to
return a task instance
+ to the worker for execution.
+ """
+ from airflow.executors import workloads
+ from airflow.executors.executor_loader import ExecutorLoader
+
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+ task_instance.scheduled_dttm = timezone.utcnow()
+
+ # Check which executors are configured for direct queueing
+ direct_queueing_allowlist = conf.getlist("triggerer",
"direct_queueing_executors", fallback=[])
+
+ if direct_queueing_allowlist:
+ # Resolve team name for multi-team setups
+ team_name: str | None = None
+ if conf.getboolean("core", "multi_team"):
+ team_name =
task_instance.dag_model.get_team_name(task_instance.dag_id, session=session)
+
+ # Load configured executors (cached) and find the one responsible
for this task instance
+ @cache
Review Comment:
I don't think the @cache decorator makes sense here, as the function is
recreated each time return_to_worker() method is invoked. Or we remove the
@cache, or we move the method outside and cache it.
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +286,57 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
if trigger.callback:
trigger.callback.handle_event(event, session)
+ @classmethod
+ @provide_session
+ def return_to_worker(cls, task_instance: TaskInstance, session: Session)
-> None:
+ """
+ Return a task instance to the worker for execution.
+
+ This is used when a trigger fires an event and we need to resume a
deferred task instance.
+ It is optimized for configured executor types to directly enqueue to a
worker without going
+ through the scheduler, but can also be used as a general utility to
return a task instance
+ to the worker for execution.
+ """
+ from airflow.executors import workloads
+ from airflow.executors.executor_loader import ExecutorLoader
+
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+ task_instance.scheduled_dttm = timezone.utcnow()
+
+ # Check which executors are configured for direct queueing
+ direct_queueing_allowlist = conf.getlist("triggerer",
"direct_queueing_executors", fallback=[])
+
+ if direct_queueing_allowlist:
+ # Resolve team name for multi-team setups
+ team_name: str | None = None
+ if conf.getboolean("core", "multi_team"):
Review Comment:
Same remark here, I would expect the config not to change during runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]