rubenbriones commented on issue #29202:
URL: https://github.com/apache/airflow/issues/29202#issuecomment-1407608339

   Hello again, although I thought that with version 2.5.1 the problem was 
solved, it seems that it is not quite like that, and that it depends on the 
execution time of the tasks. Since in the MWE that I put it is true that 
everything goes well, but if I add a `sleep(X)` in each of the tasks I have the 
problem that I commented:
   
   ```py
   import time
   import pendulum
   from airflow.decorators import dag, task
   
   
   @dag(
       schedule="0 0 * * MON-FRI",
       start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
       catchup=True,
       max_active_runs=1,  # <-- I have tried removing this, and the problem 
persists. 
   )
   def etl_sleep():
   
       @task
       def get_symbols():
           res = [('A', 1, 111), ('B', 2, 222), ('C', 3, 333)]
           return res
   
       @task
       def extract(symbol_info, data_interval_end=None):
           # Do some work...
           time.sleep(symbol_info[1])
           return symbol_info
   
       @task(trigger_rule="one_success")
       def transform(symbol_info, data_interval_end=None):
           # Do some work...
           return symbol_info
   
       @task(trigger_rule="one_success")
       def load(symbol_info, data_interval_end=None):
           # Do some work...
           return symbol_info
   
       # DAG
       symbols = get_symbols()
       raw_symbols_data = extract.expand(symbol_info=symbols)
       clean_symbols_data = transform.expand(symbol_info=raw_symbols_data)
       load.expand(symbol_info=clean_symbols_data)
   
   
   etl_sleep()
   ```
   
   Looking at the logs I see that with the above code the first task to finish, 
mapped from `extract`, logs the following:
   `[2023-01-29, 09:05:08 UTC] {taskinstance.py:2578} INFO - 1 downstream tasks 
scheduled from follow-on schedule check`
   
   Or even I have seen some cases in which all the tasks log `0 downstream 
tasks scheduled`, although the next `transform` task is executed, but only for 
one mapped index, and not for the 3 that it should.
   
   On the other hand, if I remove the sleep, the first task to finish does log 
the 3 subsequent `transform` tasks, one for each mapped index:
   
   `[2023-01-29, 09:14:44 UTC] {taskinstance.py:2578} INFO - 3 downstream tasks 
scheduled from follow-on schedule check`
   
   ----
   Airflow Version: [v2.5.1](https://pypi.python.org/pypi/apache-airflow/2.5.1)
   Git Version: .release:2.5.1+49867b660b6231c1319969217bc61917f7cf9829


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to