This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 184ea7f57dd Use attribute based trigger queue detection for triggers
(#61531)
184ea7f57dd is described below
commit 184ea7f57ddfd5338845e637b798c777d586be59
Author: Amogh Desai <[email protected]>
AuthorDate: Sun Feb 8 19:37:07 2026 +0530
Use attribute based trigger queue detection for triggers (#61531)
---
airflow-core/src/airflow/triggers/base.py | 4 ++++
airflow-core/src/airflow/triggers/callback.py | 2 ++
task-sdk/src/airflow/sdk/execution_time/task_runner.py | 8 +++-----
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/triggers/base.py
b/airflow-core/src/airflow/triggers/base.py
index 490423da5fd..416558242b8 100644
--- a/airflow-core/src/airflow/triggers/base.py
+++ b/airflow-core/src/airflow/triggers/base.py
@@ -63,6 +63,8 @@ class BaseTrigger(abc.ABC, LoggingMixin):
let them be re-instantiated elsewhere.
"""
+ supports_triggerer_queue: bool = True
+
def __init__(self, **kwargs):
# these values are set by triggerer when preparing to run the instance
# when run, they are injected into logger record.
@@ -134,6 +136,8 @@ class BaseEventTrigger(BaseTrigger):
event-driven scheduling.
"""
+ supports_triggerer_queue: bool = False
+
@staticmethod
def hash(classpath: str, kwargs: dict[str, Any]) -> int:
"""
diff --git a/airflow-core/src/airflow/triggers/callback.py
b/airflow-core/src/airflow/triggers/callback.py
index 336aa232e96..aadfffe38cc 100644
--- a/airflow-core/src/airflow/triggers/callback.py
+++ b/airflow-core/src/airflow/triggers/callback.py
@@ -35,6 +35,8 @@ PAYLOAD_BODY_KEY = "body"
class CallbackTrigger(BaseTrigger):
"""Trigger that executes a callback function asynchronously."""
+ supports_triggerer_queue: bool = False
+
def __init__(self, callback_path: str, callback_kwargs: dict[str, Any] |
None = None):
super().__init__()
self.callback_path = callback_path
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index ba52ca064f6..9fa13a08d36 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -121,8 +121,6 @@ from airflow.sdk.execution_time.sentry import Sentry
from airflow.sdk.execution_time.xcom import XCom
from airflow.sdk.listener import get_listener_manager
from airflow.sdk.timezone import coerce_datetime
-from airflow.triggers.base import BaseEventTrigger
-from airflow.triggers.callback import CallbackTrigger
if TYPE_CHECKING:
import jinja2
@@ -1097,9 +1095,9 @@ def _defer_task(
classpath, trigger_kwargs = defer.trigger.serialize()
queue: str | None = None
# Currently, only task-associated BaseTrigger instances may have a
non-None queue,
- # and only when triggerer.queues_enabled is True.
- if not isinstance(defer.trigger, (BaseEventTrigger, CallbackTrigger)) and
conf.getboolean(
- "triggerer", "queues_enabled", fallback=False
+ # and only when triggerer.queues_enabled conf is True.
+ if conf.getboolean("triggerer", "queues_enabled", fallback=False) and
getattr(
+ defer.trigger, "supports_triggerer_queue", True
):
queue = ti.task.queue