GitHub user dinjazelena98 added a comment to the discussion: DAG in queued when
it has Deferrable
@potiuk , hey i have been trying to solve it for a week now, checked
everything, trigger logs, metadata database, scheduler logs. Absolutely no idea
why it gets stuck in queued for the entire runtime of the DAG that it
triggered, here is simple snippet to reproduce:
```python
import asyncio
import pendulum
import time
from typing import Any, AsyncIterator
from airflow.sdkimport dag
from airflow.operators.bash import BashOperator
from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models.baseoperator import BaseOperator
class DummySleepTrigger(BaseTrigger):
def __init__(self, sleep_time: int):
super().__init__()
self.sleep_time = sleep_time
def serialize(self) -> tuple[str, dict[str, Any]]:
return ("dags.dummy.DummySleepTrigger", {"sleep_time": self.sleep_time})
async def run(self) -> AsyncIterator[TriggerEvent]:
self.log.info(f"Dummy trigger started, sleeping for {self.sleep_time}
seconds.")
await asyncio.sleep(self.sleep_time)
self.log.info("Dummy trigger finished sleeping.")
yield TriggerEvent({"status": "success", "dummy_value": f"slept for
{self.sleep_time}"})
class DummySleepOperator(BaseOperator):
def __init__(self, sleep_time: int, **kwargs) -> None:
super().__init__(**kwargs)
self.sleep_time = sleep_time
def execute(self, context):
self.defer(trigger=DummySleepTrigger(sleep_time=self.sleep_time),
method_name="resumed_method")
def resumed_method(self, context, event=None):
self.log.info(f"Resumed! Event details: {event}")
# Return a dummy dictionary, just like the real operator
return {"status": "success", "dummy_data": [1, 2, 3]}
@dag(
dag_id="ingestion_dummy_dag",
start_date=pendulum.datetime(2025, 1, 1),
)
def ingestion_dummy_dag():
BashOperator(
task_id="process_data_dummy",
bash_command="echo '--- Starting dummy ingestion work ---' && sleep 60
&& echo '--- Dummy ingestion work finished ---'",
)
ingestion_dummy_dag()
@dag(
dag_id="monitor_dummy_dag",
start_date=pendulum.datetime(2025, 1, 1),
schedule="@continuous",
max_active_runs=1,
)
def monitor_dummy_dag():
monitor_task = DummySleepOperator(task_id="dummy_monitor_task",
sleep_time=10)
trigger_ingestion_task = TriggerDagRunOperator(
task_id="trigger_ingestion_dummy_dag",
trigger_dag_id="ingestion_dummy_dag",
wait_for_completion=False,
conf=monitor_task.output
)
monitor_task >> trigger_ingestion_task
monitor_dummy_dag()
```
So looking at this, it should have monitor DAG that defers (wait for events,
here it sleeps just for showcasing), once its done, scheduler picks up the
TriggerEvent, calls method_name, and then we use trigger operator to trigger
next DAG. Once the ingestion DAG is triggered, the monitor should be again in
defer state sleeping, but actually it gets queued, and stays there for the
entire runtime of the ingestion DAG.
I tested without defer, and its works, so problem is not in
TriggerDagRunOperator.
GitHub link:
https://github.com/apache/airflow/discussions/51875#discussioncomment-13519004
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]