rcheatham-q opened a new issue, #25681:
URL: https://github.com/apache/airflow/issues/25681

   ### Apache Airflow version
   
   2.3.3
   
   ### What happened
   
   The scheduler crashed when attempting to queue a dynamically mapped task 
which is directly downstream and only dependent on another dynamically mapped 
task.
   
   ### What you think should happen instead
   
   The scheduler does not crash and the dynamically mapped task executes 
normally
   
   ### How to reproduce
   
   ### Setup
   - one DAG with two tasks, one directly downstream of the other
   - the DAG has a schedule (e.g. @hourly)
   - both tasks use task expansion
   - the second task uses the output of the first task as its expansion 
parameter
   - the scheduler's pool size is smaller than the number of map indices in 
each task
   
   ### Steps to reproduce
   1. enable the DAG and let it run
   
   ### Operating System
   
   MacOS and Dockerized Linux on MacOS
   
   ### Versions of Apache Airflow Providers
   
   None
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   I have tested and confirmed this bug is present in three separate 
deployments:
   
   1. `airflow standalone`
   2. DaskExecutor using docker compose
   3. KubernetesExecutor using Docker Desktop's builtin Kubernetes cluster
   
   All three of these deployments were executed locally on a Macbook Pro.
   
   ### 1. `airflow standalone`
   I created a new Python 3.9 virtual environment, installed Airflow 2.3.3, 
configured a few environment variables, and executed `airflow standalone`. Here 
is a bash script that completes all of these tasks:
   
   <details><summary>airflow_standalone.sh</summary>
   
   ```bash
   #!/bin/bash
   
   # ensure working dir is correct
   DIR=$(cd $(dirname $BASH_SOURCE[0]) && pwd)
   cd $DIR
   
   set -x
   
   # set version parameters
   AIRFLOW_VERSION="2.3.3"
   PYTHON_VERSION="3.9"
   
   # configure Python environment
   if [ ~ -d "$DIR/.env" ]
   then
       python3 -m venv "$DIR/.env"
   fi
   
   source "$DIR/.env/bin/activate"
   pip install --upgrade pip
   
   # install Airflow
   
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt";
   pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint 
"${CONSTRAINT_URL}"
   
   # configure Airflow
   export AIRFLOW_HOME="$DIR/.airflow"
   export AIRFLOW__CORE__DAGS_FOLDER="$DIR/dags"
   export AIRFLOW__CORE__LOAD_EXAMPLES="False"
   export AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS="False"
   
   # start Airflow
   exec "$DIR/.env/bin/airflow" standalone
   ```
   </details>
   
   Here is the DAG code that can be placed in a `dags` directory in the same 
location as the above script. Note that this
   DAG code triggers the bug in all environments I tested.
   
   <details><summary>bug_test.sh</summary>
   
   ```python
   import pendulum
   from airflow.decorators import dag, task
   
   @dag(
       'bug_test',
       schedule_interval='@hourly',
       start_date=pendulum.now().add(hours=-2)
   )
   def test_scheduler_bug():
       @task
       def do_something(i):
           return i + 10
   
       @task
       def do_something_else(i):
           import logging
           log = logging.getLogger('airflow.task')
           log.info("I'll never run")
   
       nums = do_something.expand(i=[i+1 for i in range(20)])
       do_something_else.expand(i=nums)
   
   TEST_DAG = test_scheduler_bug()
   ```
   </details>
   
   Once set up, simply activating the DAG will demonstrate the bug.
   
   ### 2. DaskExecutor on docker compose with Postgres 12
   
   I cannot provide a full replication of this setup as it is rather in depth. 
The Docker image is starts from `python:3.9-slim` then installs Airflow with 
appropriate constraints. It has a lot of additional packages installed, both 
system and python. It also has a custom entrypoint that can run the Dask 
scheduler in addition to regular Airflow commands. Here are the applicable 
Airflow configuration values:
   
   <details><summary>airflow.cfg</summary>
   
   ```conf
   [core]
   donot_pickle = False
   executor = DaskExecutor
   load_examples = False
   max_active_tasks_per_dag = 16
   parallelism = 4
   
   [scheduler]
   dag_dir_list_interval = 0
   catchup_by_default = False
   parsing_processes = 3
   scheduler_health_check_threshold = 90
   ```
   </details>
   
   Here is a docker-compose file that is nearly identical to the one I use (I 
just removed unrelated bits):
   
   <details><summary>docker-compose.yml</summary>
   
   ```yml
   version: '3.7'
   
   services:
       metastore:
           image: postgres:12-alpine
           ports:
               - 5432:5432
           container_name: airflow-metastore
           volumes:
               - ${AIRFLOW_HOME_DIR}/pgdata:/var/lib/postgresql/data
           environment:
               POSTGRES_USER: airflow
               POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD}
               PGDATA: /var/lib/postgresql/data/pgdata
   
       airflow-webserver:
           image: 'my_custom_image:tag'
           ports:
               - '8080:8080'
           depends_on:
               - metastore
           container_name: airflow-webserver
           environment:
               AIRFLOW_HOME: /opt/airflow
               AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY}
               AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
               AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
           env_file: container_vars.env
           command: 
               - webserver
               - --daemon
               - --access-logfile
               - /var/log/airflow/webserver-access.log
               - --error-logfile
               - /var/log/airflow/webserver-errors.log
               - --log-file
               - /var/log/airflow/webserver.log
           volumes:
               - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
   
       airflow-scheduler:
           image: 'my_custom_image:tag'
           depends_on:
               - metastore
               - dask-scheduler
           container_name: airflow-scheduler
           environment:
               AIRFLOW_HOME: /opt/airflow
               AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
               AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
               SCHEDULER_RESTART_INTERVAL: ${SCHEDULER_RESTART_INTERVAL}
           env_file: container_vars.env
           restart: unless-stopped
           command:
               - scheduler
               - --daemon
               - --log-file
               - /var/log/airflow/scheduler.log
           volumes:
               - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
   
       dask-scheduler:
           image: 'my_custom_image:tag'
           ports:
               - 8787:8787
           container_name: airflow-dask-scheduler
           command:
               - dask-scheduler
           
       dask-worker:
           image: 'my_custom_image:tag'
           depends_on:
               - dask-scheduler
               - metastore
           container_name: airflow-worker
           environment:
               AIRFLOW_HOME: /opt/airflow
               AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
               AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
           env_file: container_vars.env
           command:
               - dask-worker
               - dask-scheduler:8786
               - --nprocs
               - '8'
               - --nthreads
               - '1'
           volumes:
               - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow
   ```
   </details>
   
   I also had to manually change the default pool size to 15 in the UI in order 
to trigger the bug. With the default pool set to 128 the bug did not trigger.
   
   ### 3. KubernetesExecutor on Docker Desktop builtin Kubernetes cluster with 
Postgres 11
   
   This uses the official [Airflow Helm 
Chart](https://airflow.apache.org/docs/helm-chart/stable/index.html) with the 
following values overrides:
   
   <details><summary>values.yaml</summary>
   
   ```yml
   defaultAirflowRepository: my_custom_image
   defaultAirflowTag: "my_image_tag"
   
   airflowVersion: "2.3.3"
   executor: "KubernetesExecutor"
   
   webserverSecretKeySecretName: airflow-webserver-secret-key
   fernetKeySecretName: airflow-fernet-key
   
   config:
     webserver:
       expose_config: 'True'
       base_url: http://localhost:8080
     scheduler:
       catchup_by_default: 'False'
     api:
       auth_backends: airflow.api.auth.backend.default
   
   triggerer:
     enabled: false
   
   statsd:
     enabled: false
   
   redis:
     enabled: false
   
   cleanup:
     enabled: false
   
   logs:
     persistence:
       enabled: true
   
   workers:
     extraVolumes:
     - name: airflow-dags
       hostPath:
         path: /local/path/to/dags
         type: Directory
     extraVolumeMounts:
     - name: airflow-dags
       mountPath: /opt/airflow/dags
       readOnly: true
   
   scheduler:
     extraVolumes:
     - name: airflow-dags
       hostPath:
         path: /local/path/to/dags
         type: Directory
     extraVolumeMounts:
     - name: airflow-dags
       mountPath: /opt/airflow/dags
       readOnly: true
   ```
   </details>
   
   The docker image is the official `airflow:2.3.3-python3.9` image with a 
single environment variable modified:
   
   ```conf
   PYTHONPATH="/opt/airflow/dags/repo/dags:${PYTHONPATH}"
   ```
   
   
   ### Anything else
   
   This is my understanding of the timeline that produces the crash:
   
   1. The scheduler queues some of the subtasks in the first task
   1. Some subtasks run and yield their XCom results
   1. The scheduler runs, queueing the remainder of the subtasks for the first 
task and creates some subtasks in the second task using the XCom results 
produced thus far
   1. The remainder of the subtasks from the first task complete
   1. The scheduler attempts to recreate all of the subtasks of the second 
task, including the ones already created, and a unique constraint in the 
database is violated and the scheduler crashes
   1. When the scheduler restarts, it attempts the previous step again and 
crashes again, thus entering a crash loop
   
   It seems that if some but not all subtasks for the second task have been 
created when the scheduler attempts to queue
   the mapped task, then the scheduler tries to create all of the subtasks 
again which causes a unique constraint violation.
   
   **NOTES**
   - IF the scheduler can queue as many or more tasks as there are map indices 
for the task, then this won't happen. The
   provided test case succeeded on the DaskExecutor deployment when the default 
pool was 128, however when I reduced that pool to 15 this bug occurred.
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to