internetcoffeephone opened a new issue, #28296:
URL: https://github.com/apache/airflow/issues/28296

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Using Airflow 2.4.2.
   
   I've got a task that retrieves some filenames, which then creates 
dynamically mapped tasks to move the files, one per task.
   I'm using a similar task across multiple DAGs. However, task mapping fails 
on some DAG runs: it inconsistently happens per DAG run, and some DAGs do not 
seem to be affected at all. These seem to be the DAGs where no task was ever 
mapped, so that the mapped task instance ended up in a Skipped state.
   
   What happens is that multiple files will be found, but only a single 
dynamically mapped task will be created. This task never starts and has 
map_index of -1. It can be found under the "List instances, all runs" menu, but 
says "No Data found." under the "Mapped Tasks" tab.
   
   When I press the "Run" button when the mapped task is selected, the 
following error appears:
   
   ```Could not queue task instance for execution, dependencies not met: 
Previous Dagrun State: depends_on_past is true for this task's DAG, but the 
previous task instance has not run yet., Task has been mapped: The task has yet 
to be mapped!```
   
   The previous task *has* run however. No errors appeared in my Airflow logs.
   
   ### What you think should happen instead
   
   The appropriate amount of task instances should be created, they should 
correctly resolve the ```depends_on_past``` check and then proceed to run 
correctly.
   
   ### How to reproduce
   
   This DAG reliably reproduces the error for me. The first set of mapped tasks 
succeeds, the subsequent ones do not.
   
   ```
   from airflow import DAG
   from airflow.decorators import task
   import datetime as dt
   
   from airflow.operators.python import PythonOperator
   
   
   @task
   def get_filenames_kwargs():
       return [
           {
               "file_name": i,
           }
           for i in range(10)
       ]
   
   
   def print_filename(file_name):
       print(file_name)
   
   
   with DAG(
           dag_id="dtm_test",
           start_date=dt.datetime(2022, 12, 10),
           default_args={
               "owner": "airflow",
               "depends_on_past": True,
           },
           schedule="@daily",
   ) as dag:
       get_filenames_task = 
get_filenames_kwargs.override(task_id="get_filenames_task")()
   
       print_filename_task = PythonOperator.partial(
           task_id="print_filename_task",
           python_callable=print_filename,
       ).expand(op_kwargs=get_filenames_task)
   
       # Perhaps redundant
       get_filenames_task >> print_filename_task
   ```
   
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to