internetcoffeephone opened a new issue, #28296:
URL: https://github.com/apache/airflow/issues/28296
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Using Airflow 2.4.2.
I've got a task that retrieves some filenames, which then creates
dynamically mapped tasks to move the files, one per task.
I'm using a similar task across multiple DAGs. However, task mapping fails
on some DAG runs: it inconsistently happens per DAG run, and some DAGs do not
seem to be affected at all. These seem to be the DAGs where no task was ever
mapped, so that the mapped task instance ended up in a Skipped state.
What happens is that multiple files will be found, but only a single
dynamically mapped task will be created. This task never starts and has
map_index of -1. It can be found under the "List instances, all runs" menu, but
says "No Data found." under the "Mapped Tasks" tab.
When I press the "Run" button when the mapped task is selected, the
following error appears:
```Could not queue task instance for execution, dependencies not met:
Previous Dagrun State: depends_on_past is true for this task's DAG, but the
previous task instance has not run yet., Task has been mapped: The task has yet
to be mapped!```
The previous task *has* run however. No errors appeared in my Airflow logs.
### What you think should happen instead
The appropriate amount of task instances should be created, they should
correctly resolve the ```depends_on_past``` check and then proceed to run
correctly.
### How to reproduce
This DAG reliably reproduces the error for me. The first set of mapped tasks
succeeds, the subsequent ones do not.
```
from airflow import DAG
from airflow.decorators import task
import datetime as dt
from airflow.operators.python import PythonOperator
@task
def get_filenames_kwargs():
return [
{
"file_name": i,
}
for i in range(10)
]
def print_filename(file_name):
print(file_name)
with DAG(
dag_id="dtm_test",
start_date=dt.datetime(2022, 12, 10),
default_args={
"owner": "airflow",
"depends_on_past": True,
},
schedule="@daily",
) as dag:
get_filenames_task =
get_filenames_kwargs.override(task_id="get_filenames_task")()
print_filename_task = PythonOperator.partial(
task_id="print_filename_task",
python_callable=print_filename,
).expand(op_kwargs=get_filenames_task)
# Perhaps redundant
get_filenames_task >> print_filename_task
```
### Operating System
Amazon Linux 2
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]