dominik-d-stampli opened a new issue, #47398: URL: https://github.com/apache/airflow/issues/47398
### Apache Airflow version 2.10.5 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? Hey, it seems that Dataset events are not always triggering downstream DAG, even when there is free capacity in the cluster. I'm writing here, as I'm not sure if this is a bug or an expected behavior of Airflow. I have a small demo to showcase what I mean. The demo consists of two DAGs, producer & consumer, one creating events on a dataset, and the other one running whenever new dataset events happen. A single run of the producer creates 4 new events, each one should be processed by the consumer DAG. But instead only some not all of those events are processed. On the first run 2 out of 4 events were processed. After the second run of the producer, the consumer was triggered with 3 events, two from the previous producer run, and one from the new batch. Three events are left not being processed at all. Also it seems that sometimes the event source is missing from in the Airflow Dataset events view. <img width="1920" alt="Image" src="https://github.com/user-attachments/assets/e76395c6-3ccd-40b3-a53d-de772e147953" /> I don't see anything about those remaining events in the Scheduler's logs. I noticed this problem originally in Amazon's MWAA service, and then I wanted to check if I can reproduce it in docker Airflow, and indeed I was able to reproduce it in Airflow running in Docker Compose. ### What you think should happen instead? I would expect all dataset events to trigger the downstream DAG. ### How to reproduce 1. Run Airflow in Docker Compose: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html 2. Create producer.py DAG: ```python from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.datasets import Dataset START_DATASET = Dataset("DA://start") with DAG( dag_id="producer", schedule=None, start_date=datetime(2024, 12, 3), catchup=False, ) as dag: @task def generate_data(): return ["football ⚽", "jogging 🏃", "biking 🚴", "hiking 🥾"] @task(outlets=[START_DATASET]) def trigger_down_streams(element: str, **context): context["outlet_events"][START_DATASET].extra = {"Activity": element} generated_data = generate_data() trigger_down_streams.expand(element=generated_data) ``` 3. Create consumer.py DAG: ```python from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.datasets import Dataset START_DATASET = Dataset("DA://start") with DAG( dag_id="consumer", schedule=[START_DATASET], start_date=datetime(2024, 12, 3), catchup=False, ) as dag: @task def print_triggering_dataset_events(**context): triggering_dataset_events = context.get("triggering_dataset_events") print(triggering_dataset_events) print_triggering_dataset_events() ``` ### Operating System MacOS - using docker compose ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
