This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 184ea7f57dd Use attribute based trigger queue detection for triggers 
(#61531)
184ea7f57dd is described below

commit 184ea7f57ddfd5338845e637b798c777d586be59
Author: Amogh Desai <[email protected]>
AuthorDate: Sun Feb 8 19:37:07 2026 +0530

    Use attribute based trigger queue detection for triggers (#61531)
---
 airflow-core/src/airflow/triggers/base.py              | 4 ++++
 airflow-core/src/airflow/triggers/callback.py          | 2 ++
 task-sdk/src/airflow/sdk/execution_time/task_runner.py | 8 +++-----
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/triggers/base.py 
b/airflow-core/src/airflow/triggers/base.py
index 490423da5fd..416558242b8 100644
--- a/airflow-core/src/airflow/triggers/base.py
+++ b/airflow-core/src/airflow/triggers/base.py
@@ -63,6 +63,8 @@ class BaseTrigger(abc.ABC, LoggingMixin):
     let them be re-instantiated elsewhere.
     """
 
+    supports_triggerer_queue: bool = True
+
     def __init__(self, **kwargs):
         # these values are set by triggerer when preparing to run the instance
         # when run, they are injected into logger record.
@@ -134,6 +136,8 @@ class BaseEventTrigger(BaseTrigger):
     event-driven scheduling.
     """
 
+    supports_triggerer_queue: bool = False
+
     @staticmethod
     def hash(classpath: str, kwargs: dict[str, Any]) -> int:
         """
diff --git a/airflow-core/src/airflow/triggers/callback.py 
b/airflow-core/src/airflow/triggers/callback.py
index 336aa232e96..aadfffe38cc 100644
--- a/airflow-core/src/airflow/triggers/callback.py
+++ b/airflow-core/src/airflow/triggers/callback.py
@@ -35,6 +35,8 @@ PAYLOAD_BODY_KEY = "body"
 class CallbackTrigger(BaseTrigger):
     """Trigger that executes a callback function asynchronously."""
 
+    supports_triggerer_queue: bool = False
+
     def __init__(self, callback_path: str, callback_kwargs: dict[str, Any] | 
None = None):
         super().__init__()
         self.callback_path = callback_path
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index ba52ca064f6..9fa13a08d36 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -121,8 +121,6 @@ from airflow.sdk.execution_time.sentry import Sentry
 from airflow.sdk.execution_time.xcom import XCom
 from airflow.sdk.listener import get_listener_manager
 from airflow.sdk.timezone import coerce_datetime
-from airflow.triggers.base import BaseEventTrigger
-from airflow.triggers.callback import CallbackTrigger
 
 if TYPE_CHECKING:
     import jinja2
@@ -1097,9 +1095,9 @@ def _defer_task(
     classpath, trigger_kwargs = defer.trigger.serialize()
     queue: str | None = None
     # Currently, only task-associated BaseTrigger instances may have a 
non-None queue,
-    # and only when triggerer.queues_enabled is True.
-    if not isinstance(defer.trigger, (BaseEventTrigger, CallbackTrigger)) and 
conf.getboolean(
-        "triggerer", "queues_enabled", fallback=False
+    # and only when triggerer.queues_enabled conf is True.
+    if conf.getboolean("triggerer", "queues_enabled", fallback=False) and 
getattr(
+        defer.trigger, "supports_triggerer_queue", True
     ):
         queue = ti.task.queue
 

Reply via email to