kaojunsong opened a new issue, #23609:
URL: https://github.com/apache/airflow/issues/23609

   ### Apache Airflow version
   
   2.2.5
   
   ### What happened
   
   The following error happens in the arflow scheduler:
   
   [2022-05-09 11:22:44,122] [INFO] base_executor.py:85 - Adding to queue: 
['airflow', 'tasks', 'run', 'stress-ingest', 'ingest', 
'manual__2022-05-09T10:31:12.256131+00:00', '--local', '--subdir', 
'DAGS_FOLDER/stress_ingest.py']
   Exception in thread Thread-7940:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
       self.run()
     File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 317, 
in run
       result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
     File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376, 
in wait_result_broken_or_wakeup
       worker_sentinels = [p.sentinel for p in self.processes.values()]
     File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376, 
in <listcomp>
       worker_sentinels = [p.sentinel for p in self.processes.values()]
   RuntimeError: dictionary changed size during iteration
   
   
   ### What you think should happen instead
   
   No error happens
   
   ### How to reproduce
   
   I have created 2 DAGs, and DAG one will trigger DAG two thousands of times:
   
   ```
   from datetime import datetime, timedelta
   from distutils.command.config import config
   from textwrap import dedent
   
   from common.utils import httpclient
   
   # The DAG object; we'll need this to instantiate a DAG
   from airflow import DAG
   
   # Operators; we need this to operate!
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow.operators.python import PythonOperator
   
   DELEGATOR_END_POINT = 
'http://space-common-delegator-clusteripsvc:9049/api/pipelines/dags/stress-ingest/dagRuns/'
   
   def trigger(**kwargs):
       conf = kwargs.get('dag_run').conf
       loop_number=conf["loop"]
       print("Loop: "+str(loop_number))
       for i in range(loop_number):
         httpclient.post(DELEGATOR_END_POINT, conf)
         print("Trigger: "+str(i))
   
   
   with DAG(
       'stress-test',
       # These args will get passed on to each operator
       # You can override them on a per-task basis during operator 
initialization
       default_args={
           'depends_on_past': False,
           'email': ['[email protected]'],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 1,
           'retry_delay': timedelta(minutes=5),
       },
       description='A Stress Test DAG',
       start_date=datetime(2022, 1, 1),
       catchup=False,
       schedule_interval=None,
       tags=['stress','test'],
   ) as dag:
   
       trigger_via_delegator = PythonOperator(
           task_id='trigger_via_delegator',
           python_callable=trigger,
           dag=dag,
           do_xcom_push=True,
           retries=0
       )
   
       trigger_via_delegator
   ```
   ```
   from datetime import datetime, timedelta
   from distutils.command.config import config
   from textwrap import dedent
   
   from common.utils import httpclient
   
   # The DAG object; we'll need this to instantiate a DAG
   from airflow import DAG
   
   # Operators; we need this to operate!
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow.operators.python import PythonOperator
   from common.utils.httpclient import post, get
   from common.constants import INGEST_PULL_END_POINT
   
   def ingest(**kwargs):
       conf = kwargs.get('dag_run').conf
       body = conf['body']
       post(INGEST_PULL_END_POINT, body, 'stress test')
   
   
   with DAG(
       'stress-ingest',
       default_args={
           'depends_on_past': False,
           'email': ['[email protected]'],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 1,
           'retry_delay': timedelta(minutes=5),
       },
       description='A Stress Test DAG',
       start_date=datetime(2022, 1, 1),
       catchup=False,
       schedule_interval=None,
       tags=['stress','test'],
   ) as dag:
   
       trigger_via_delegator = PythonOperator(
           task_id='ingest',
           python_callable=ingest,
           dag=dag,
           do_xcom_push=True,
           retries=0
       )
   
       trigger_via_delegator
   
   ```
   
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 9 (stretch)" NAME="Debian GNU/Linux" 
VERSION_ID="9" VERSION="9 (stretch)" VERSION_CODENAME=stretch ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   
apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5
 \
        --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt";
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   We build airflow using the following Dockerfile:
   ```
   FROM registry.gitlab.com/zontal/docker/registry/openjdk-python:jdk8-python3.9
   
   # install deps
   RUN apt-get update -y && apt-get install -y \
       libczmq-dev \
       python3-dev \
       python2.7-dev \
       libldap2-dev \
       libsasl2-dev \
       ldap-utils \
       tox \
       lcov \
       valgrind \
       libssl-dev \
       inetutils-telnet \
       bind9utils \
       gcc \
       alien \
       && apt-get clean
   
   RUN pip3 install --upgrade pip
   
   RUN pip3 install 
apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5
 \
        --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt";
   RUN pip3 install trino psycopg2-binary python-ldap matplotlib retrying 
authlib cx_Oracle
   
   RUN wget 
https://download.oracle.com/otn_software/linux/instantclient/214000/oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm
 && \
       alien -i oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm
   
   RUN apt-get install libaio1
   ENV LD_LIBRARY_PATH 
/usr/lib/oracle/21/client64/lib/${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}
   ENV ORACLE_HOME /usr/lib/oracle/21/client64
   ENV PATH $PATH:$ORACLE_HOME/bin
   ```
   Then  we run it using Kubernetes(v 1.19)
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to