dominik-d-stampli opened a new issue, #47398:
URL: https://github.com/apache/airflow/issues/47398

   ### Apache Airflow version
   
   2.10.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Hey, it seems that Dataset events are not always triggering downstream DAG, 
even when there is free capacity in the cluster. I'm writing here, as I'm not 
sure if this is a bug or an expected behavior of Airflow. 
   
   I have a small demo to showcase what I mean. The demo consists of two DAGs, 
producer & consumer, one creating events on a dataset, and the other one 
running whenever new dataset events happen. A single run of the producer 
creates 4 new events, each one should be processed by the consumer DAG. But 
instead only some not all of those events are processed. On the first run 2 out 
of 4 events were processed. After the second run of the producer, the consumer 
was triggered with 3 events, two from the previous producer run, and one from 
the new batch. Three events are left not being processed at all.
   
   Also it seems that sometimes the event source is missing from in the Airflow 
Dataset events view.
   
   <img width="1920" alt="Image" 
src="https://github.com/user-attachments/assets/e76395c6-3ccd-40b3-a53d-de772e147953";
 />
   
   I don't see anything about those remaining events in the Scheduler's logs. 
   
   I noticed this problem originally in Amazon's MWAA service, and then I 
wanted to check if I can reproduce it in docker Airflow, and indeed I was able 
to reproduce it in Airflow running in Docker Compose.
   
   ### What you think should happen instead?
   
   I would expect all dataset events to trigger the downstream DAG.
   
   ### How to reproduce
   
   1. Run Airflow in Docker Compose: 
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
   2. Create producer.py DAG:
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.datasets import Dataset
   
   START_DATASET = Dataset("DA://start")
   
   with DAG(
       dag_id="producer",
       schedule=None,
       start_date=datetime(2024, 12, 3),
       catchup=False,
   ) as dag:
       @task
       def generate_data():
           return ["football ⚽", "jogging 🏃", "biking 🚴", "hiking 🥾"]
       
       @task(outlets=[START_DATASET])
       def trigger_down_streams(element: str, **context):
           context["outlet_events"][START_DATASET].extra = {"Activity": element}
   
       generated_data = generate_data()
       trigger_down_streams.expand(element=generated_data)
   ```
   
   3. Create consumer.py DAG:
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.datasets import Dataset
   
   START_DATASET = Dataset("DA://start")
   
   with DAG(
       dag_id="consumer",
       schedule=[START_DATASET],
       start_date=datetime(2024, 12, 3),
       catchup=False,
   ) as dag:
       @task
       def print_triggering_dataset_events(**context):
           triggering_dataset_events = context.get("triggering_dataset_events")
           print(triggering_dataset_events)
           
       print_triggering_dataset_events()
   ```
   
   ### Operating System
   
   MacOS - using docker compose
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to