GitHub user dinjazelena98 added a comment to the discussion: DAG in queued when 
it has Deferrable

@potiuk , hey i have been trying to solve it for a week now, checked 
everything, trigger logs, metadata database, scheduler logs. Absolutely no idea 
why it gets stuck in queued for the entire runtime of the DAG that it 
triggered, here is simple snippet to reproduce:

```python
import asyncio
import pendulum
import time

from typing import Any, AsyncIterator

from airflow.sdkimport dag
from airflow.operators.bash import BashOperator
from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models.baseoperator import BaseOperator


class DummySleepTrigger(BaseTrigger):
    def __init__(self, sleep_time: int):
        super().__init__()
        self.sleep_time = sleep_time

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return ("dags.dummy.DummySleepTrigger", {"sleep_time": self.sleep_time})

    async def run(self) -> AsyncIterator[TriggerEvent]:
        self.log.info(f"Dummy trigger started, sleeping for {self.sleep_time} 
seconds.")
        await asyncio.sleep(self.sleep_time)
        self.log.info("Dummy trigger finished sleeping.")
        yield TriggerEvent({"status": "success", "dummy_value": f"slept for 
{self.sleep_time}"})


class DummySleepOperator(BaseOperator):
    def __init__(self, sleep_time: int, **kwargs) -> None:
        super().__init__(**kwargs)
        self.sleep_time = sleep_time

    def execute(self, context):
        self.defer(trigger=DummySleepTrigger(sleep_time=self.sleep_time), 
method_name="resumed_method")

    def resumed_method(self, context, event=None):
        self.log.info(f"Resumed! Event details: {event}")
        # Return a dummy dictionary, just like the real operator
        return {"status": "success", "dummy_data": [1, 2, 3]}


@dag(
    dag_id="ingestion_dummy_dag",
    start_date=pendulum.datetime(2025, 1, 1),
)
def ingestion_dummy_dag():
    BashOperator(
        task_id="process_data_dummy",
        bash_command="echo '--- Starting dummy ingestion work ---' && sleep 60 
&& echo '--- Dummy ingestion work finished ---'",
    )

ingestion_dummy_dag()


@dag(
    dag_id="monitor_dummy_dag",
    start_date=pendulum.datetime(2025, 1, 1),
    schedule="@continuous",
    max_active_runs=1,
)
def monitor_dummy_dag():
    monitor_task = DummySleepOperator(task_id="dummy_monitor_task", 
sleep_time=10)

    trigger_ingestion_task = TriggerDagRunOperator(
        task_id="trigger_ingestion_dummy_dag",
        trigger_dag_id="ingestion_dummy_dag",
        wait_for_completion=False,
        conf=monitor_task.output
    )

    monitor_task >> trigger_ingestion_task

monitor_dummy_dag()
```

So looking at this, it should have monitor DAG that defers (wait for events, 
here it sleeps just for showcasing), once its done, scheduler picks up the 
TriggerEvent, calls method_name, and then we use trigger operator to trigger 
next DAG. Once the ingestion DAG is triggered, the monitor should be again in 
defer state sleeping, but actually it gets queued, and stays there for the 
entire runtime of the ingestion DAG.

I tested without defer, and its works, so problem is not in 
TriggerDagRunOperator.

GitHub link: 
https://github.com/apache/airflow/discussions/51875#discussioncomment-13519004

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to