kaojunsong opened a new issue, #23609:
URL: https://github.com/apache/airflow/issues/23609
### Apache Airflow version
2.2.5
### What happened
The following error happens in the arflow scheduler:
[2022-05-09 11:22:44,122] [INFO] base_executor.py:85 - Adding to queue:
['airflow', 'tasks', 'run', 'stress-ingest', 'ingest',
'manual__2022-05-09T10:31:12.256131+00:00', '--local', '--subdir',
'DAGS_FOLDER/stress_ingest.py']
Exception in thread Thread-7940:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 317,
in run
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376,
in wait_result_broken_or_wakeup
worker_sentinels = [p.sentinel for p in self.processes.values()]
File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376,
in <listcomp>
worker_sentinels = [p.sentinel for p in self.processes.values()]
RuntimeError: dictionary changed size during iteration
### What you think should happen instead
No error happens
### How to reproduce
I have created 2 DAGs, and DAG one will trigger DAG two thousands of times:
```
from datetime import datetime, timedelta
from distutils.command.config import config
from textwrap import dedent
from common.utils import httpclient
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
DELEGATOR_END_POINT =
'http://space-common-delegator-clusteripsvc:9049/api/pipelines/dags/stress-ingest/dagRuns/'
def trigger(**kwargs):
conf = kwargs.get('dag_run').conf
loop_number=conf["loop"]
print("Loop: "+str(loop_number))
for i in range(loop_number):
httpclient.post(DELEGATOR_END_POINT, conf)
print("Trigger: "+str(i))
with DAG(
'stress-test',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator
initialization
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='A Stress Test DAG',
start_date=datetime(2022, 1, 1),
catchup=False,
schedule_interval=None,
tags=['stress','test'],
) as dag:
trigger_via_delegator = PythonOperator(
task_id='trigger_via_delegator',
python_callable=trigger,
dag=dag,
do_xcom_push=True,
retries=0
)
trigger_via_delegator
```
```
from datetime import datetime, timedelta
from distutils.command.config import config
from textwrap import dedent
from common.utils import httpclient
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from common.utils.httpclient import post, get
from common.constants import INGEST_PULL_END_POINT
def ingest(**kwargs):
conf = kwargs.get('dag_run').conf
body = conf['body']
post(INGEST_PULL_END_POINT, body, 'stress test')
with DAG(
'stress-ingest',
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='A Stress Test DAG',
start_date=datetime(2022, 1, 1),
catchup=False,
schedule_interval=None,
tags=['stress','test'],
) as dag:
trigger_via_delegator = PythonOperator(
task_id='ingest',
python_callable=ingest,
dag=dag,
do_xcom_push=True,
retries=0
)
trigger_via_delegator
```
### Operating System
PRETTY_NAME="Debian GNU/Linux 9 (stretch)" NAME="Debian GNU/Linux"
VERSION_ID="9" VERSION="9 (stretch)" VERSION_CODENAME=stretch ID=debian
HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
### Versions of Apache Airflow Providers
apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5
\
--constraint
"https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt"
### Deployment
Other Docker-based deployment
### Deployment details
We build airflow using the following Dockerfile:
```
FROM registry.gitlab.com/zontal/docker/registry/openjdk-python:jdk8-python3.9
# install deps
RUN apt-get update -y && apt-get install -y \
libczmq-dev \
python3-dev \
python2.7-dev \
libldap2-dev \
libsasl2-dev \
ldap-utils \
tox \
lcov \
valgrind \
libssl-dev \
inetutils-telnet \
bind9utils \
gcc \
alien \
&& apt-get clean
RUN pip3 install --upgrade pip
RUN pip3 install
apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5
\
--constraint
"https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt"
RUN pip3 install trino psycopg2-binary python-ldap matplotlib retrying
authlib cx_Oracle
RUN wget
https://download.oracle.com/otn_software/linux/instantclient/214000/oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm
&& \
alien -i oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm
RUN apt-get install libaio1
ENV LD_LIBRARY_PATH
/usr/lib/oracle/21/client64/lib/${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}
ENV ORACLE_HOME /usr/lib/oracle/21/client64
ENV PATH $PATH:$ORACLE_HOME/bin
```
Then we run it using Kubernetes(v 1.19)
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]