rcheatham-q opened a new issue, #25681:
URL: https://github.com/apache/airflow/issues/25681
### Apache Airflow version
2.3.3
### What happened
The scheduler crashed when attempting to queue a dynamically mapped task
which is directly downstream and only dependent on another dynamically mapped
task.
### What you think should happen instead
The scheduler does not crash and the dynamically mapped task executes
normally
### How to reproduce
### Setup
- one DAG with two tasks, one directly downstream of the other
- the DAG has a schedule (e.g. @hourly)
- both tasks use task expansion
- the second task uses the output of the first task as its expansion
parameter
- the scheduler's pool size is smaller than the number of map indices in
each task
### Steps to reproduce
1. enable the DAG and let it run
### Operating System
MacOS and Dockerized Linux on MacOS
### Versions of Apache Airflow Providers
None
### Deployment
Other
### Deployment details
I have tested and confirmed this bug is present in three separate
deployments:
1. `airflow standalone`
2. DaskExecutor using docker compose
3. KubernetesExecutor using Docker Desktop's builtin Kubernetes cluster
All three of these deployments were executed locally on a Macbook Pro.
### 1. `airflow standalone`
I created a new Python 3.9 virtual environment, installed Airflow 2.3.3,
configured a few environment variables, and executed `airflow standalone`. Here
is a bash script that completes all of these tasks:
<details><summary>airflow_standalone.sh</summary>
```bash
#!/bin/bash
# ensure working dir is correct
DIR=$(cd $(dirname $BASH_SOURCE[0]) && pwd)
cd $DIR
set -x
# set version parameters
AIRFLOW_VERSION="2.3.3"
PYTHON_VERSION="3.9"
# configure Python environment
if [ ~ -d "$DIR/.env" ]
then
python3 -m venv "$DIR/.env"
fi
source "$DIR/.env/bin/activate"
pip install --upgrade pip
# install Airflow
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint
"${CONSTRAINT_URL}"
# configure Airflow
export AIRFLOW_HOME="$DIR/.airflow"
export AIRFLOW__CORE__DAGS_FOLDER="$DIR/dags"
export AIRFLOW__CORE__LOAD_EXAMPLES="False"
export AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS="False"
# start Airflow
exec "$DIR/.env/bin/airflow" standalone
```
</details>
Here is the DAG code that can be placed in a `dags` directory in the same
location as the above script. Note that this
DAG code triggers the bug in all environments I tested.
<details><summary>bug_test.sh</summary>
```python
import pendulum
from airflow.decorators import dag, task
@dag(
'bug_test',
schedule_interval='@hourly',
start_date=pendulum.now().add(hours=-2)
)
def test_scheduler_bug():
@task
def do_something(i):
return i + 10
@task
def do_something_else(i):
import logging
log = logging.getLogger('airflow.task')
log.info("I'll never run")
nums = do_something.expand(i=[i+1 for i in range(20)])
do_something_else.expand(i=nums)
TEST_DAG = test_scheduler_bug()
```
</details>
Once set up, simply activating the DAG will demonstrate the bug.
### 2. DaskExecutor on docker compose with Postgres 12
I cannot provide a full replication of this setup as it is rather in depth.
The Docker image is starts from `python:3.9-slim` then installs Airflow with
appropriate constraints. It has a lot of additional packages installed, both
system and python. It also has a custom entrypoint that can run the Dask
scheduler in addition to regular Airflow commands. Here are the applicable
Airflow configuration values:
<details><summary>airflow.cfg</summary>
```conf
[core]
donot_pickle = False
executor = DaskExecutor
load_examples = False
max_active_tasks_per_dag = 16
parallelism = 4
[scheduler]
dag_dir_list_interval = 0
catchup_by_default = False
parsing_processes = 3
scheduler_health_check_threshold = 90
```
</details>
Here is a docker-compose file that is nearly identical to the one I use (I
just removed unrelated bits):
<details><summary>docker-compose.yml</summary>
```yml
version: '3.7'
services:
metastore:
image: postgres:12-alpine
ports:
- 5432:5432
container_name: airflow-metastore
volumes:
- ${AIRFLOW_HOME_DIR}/pgdata:/var/lib/postgresql/data
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD}
PGDATA: /var/lib/postgresql/data/pgdata
airflow-webserver:
image: 'my_custom_image:tag'
ports:
- '8080:8080'
depends_on:
- metastore
container_name: airflow-webserver
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY}
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
env_file: container_vars.env
command:
- webserver
- --daemon
- --access-logfile
- /var/log/airflow/webserver-access.log
- --error-logfile
- /var/log/airflow/webserver-errors.log
- --log-file
- /var/log/airflow/webserver.log
volumes:
- ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
airflow-scheduler:
image: 'my_custom_image:tag'
depends_on:
- metastore
- dask-scheduler
container_name: airflow-scheduler
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
SCHEDULER_RESTART_INTERVAL: ${SCHEDULER_RESTART_INTERVAL}
env_file: container_vars.env
restart: unless-stopped
command:
- scheduler
- --daemon
- --log-file
- /var/log/airflow/scheduler.log
volumes:
- ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
dask-scheduler:
image: 'my_custom_image:tag'
ports:
- 8787:8787
container_name: airflow-dask-scheduler
command:
- dask-scheduler
dask-worker:
image: 'my_custom_image:tag'
depends_on:
- dask-scheduler
- metastore
container_name: airflow-worker
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
env_file: container_vars.env
command:
- dask-worker
- dask-scheduler:8786
- --nprocs
- '8'
- --nthreads
- '1'
volumes:
- ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
```
</details>
I also had to manually change the default pool size to 15 in the UI in order
to trigger the bug. With the default pool set to 128 the bug did not trigger.
### 3. KubernetesExecutor on Docker Desktop builtin Kubernetes cluster with
Postgres 11
This uses the official [Airflow Helm
Chart](https://airflow.apache.org/docs/helm-chart/stable/index.html) with the
following values overrides:
<details><summary>values.yaml</summary>
```yml
defaultAirflowRepository: my_custom_image
defaultAirflowTag: "my_image_tag"
airflowVersion: "2.3.3"
executor: "KubernetesExecutor"
webserverSecretKeySecretName: airflow-webserver-secret-key
fernetKeySecretName: airflow-fernet-key
config:
webserver:
expose_config: 'True'
base_url: http://localhost:8080
scheduler:
catchup_by_default: 'False'
api:
auth_backends: airflow.api.auth.backend.default
triggerer:
enabled: false
statsd:
enabled: false
redis:
enabled: false
cleanup:
enabled: false
logs:
persistence:
enabled: true
workers:
extraVolumes:
- name: airflow-dags
hostPath:
path: /local/path/to/dags
type: Directory
extraVolumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/dags
readOnly: true
scheduler:
extraVolumes:
- name: airflow-dags
hostPath:
path: /local/path/to/dags
type: Directory
extraVolumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/dags
readOnly: true
```
</details>
The docker image is the official `airflow:2.3.3-python3.9` image with a
single environment variable modified:
```conf
PYTHONPATH="/opt/airflow/dags/repo/dags:${PYTHONPATH}"
```
### Anything else
This is my understanding of the timeline that produces the crash:
1. The scheduler queues some of the subtasks in the first task
1. Some subtasks run and yield their XCom results
1. The scheduler runs, queueing the remainder of the subtasks for the first
task and creates some subtasks in the second task using the XCom results
produced thus far
1. The remainder of the subtasks from the first task complete
1. The scheduler attempts to recreate all of the subtasks of the second
task, including the ones already created, and a unique constraint in the
database is violated and the scheduler crashes
1. When the scheduler restarts, it attempts the previous step again and
crashes again, thus entering a crash loop
It seems that if some but not all subtasks for the second task have been
created when the scheduler attempts to queue
the mapped task, then the scheduler tries to create all of the subtasks
again which causes a unique constraint violation.
**NOTES**
- IF the scheduler can queue as many or more tasks as there are map indices
for the task, then this won't happen. The
provided test case succeeded on the DaskExecutor deployment when the default
pool was 128, however when I reduced that pool to 15 this bug occurred.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]