racingjellyfish opened a new issue #19535:
URL: https://github.com/apache/airflow/issues/19535


   ### Apache Airflow version
   
   2.2.1 (latest released)
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Versions of Apache Airflow Providers
   
   apache_airflow-2.2.1.dist-info
   apache_airflow_providers_amazon-2.3.0.dist-info
   apache_airflow_providers_ftp-2.0.1.dist-info
   apache_airflow_providers_http-2.0.1.dist-info
   apache_airflow_providers_imap-2.0.1.dist-info
   apache_airflow_providers_postgres-2.3.0.dist-info
   apache_airflow_providers_sqlite-2.0.1.dist-info
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   pip3 installation to an Amazon Linux 2 based AMI running on an EC2 instance 
(t3.xlarge)
   
   ### What happened
   
   Our DAG runs 8 parallel tasks on 32 LocalExecutors using the `os.fork` 
option for execution.
   We use custom operators based on Amazon providers to add Steps to an EMR 
Cluster.
   
   Seemingly randomly, tasks get stuck in the Running state, producing little 
log output.
   We can see from running `ps ax` that these "stuck" tasks have two task 
supervisor processes running.
   Retrying the task, or killing one of these supervisor processes causes 
Airflow to retry the task which then runs successfully.
   
   This is an intermittent issue that occurs seemingly randomly that renders 
Airflow unusable in a production setting.
   
   Running processes:
   ```
   21861 ?        S      0:00 airflow worker -- LocalExecutor: ['airflow', 
'tasks', 'run', 'dag1', 'one.four.five.task', 
'backfill__2021-11-11T02:00:00+00:00', '--mark-success', '--local', '--pool', 
'default_pool', '--subdir', 'DAGS_FOLDER/dag1.py', '--cfg-path', 
'/tmp/tmpb6gpqufk']
   23900 ?        Sl     0:05 airflow task supervisor: ['airflow', 'tasks', 
'run', 'dag1', 'one.four.five.task', 'backfill__2021-11-11T02:00:00+00:00', 
'--mark-success', '--local', '--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/dag1.py', '--cfg-path', '/tmp/tmpb6gpqufk']
   24034 ?        S      0:00 airflow task supervisor: ['airflow', 'tasks', 
'run', 'dag1', 'one.four.five.task', 'backfill__2021-11-11T02:00:00+00:00', 
'--mark-success', '--local', '--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/dag1.py', '--cfg-path', '/tmp/tmpb6gpqufk']
   ```
   
   Airflow UI task log for a hung task:
   ```
   *** Reading remote log from Cloudwatch log_group: log-group-01 log_stream: 
dag1/one.four.five.task/2021-11-11T02_00_00+00_00/1.log.
   [2021-11-11, 09:08:34 UTC] 
   
--------------------------------------------------------------------------------
   [2021-11-11, 09:08:34 UTC] Starting attempt 1 of 2
   [2021-11-11, 09:08:34 UTC] 
   
--------------------------------------------------------------------------------
   [2021-11-11, 09:08:34 UTC] Marking success for <Task(EmrAddStepsOperator): 
one.four.five.task> on 2021-11-11 02:00:00+00:00
   [2021-11-11, 09:08:34 UTC] Started process 24034 to run task
   ```
   
   Airflow UI task log for a successful task:
   ```
   *** Reading remote log from Cloudwatch log_group: log-group-01 log_stream: 
dag1/one.two.three.task/2021-11-11T02_00_00+00_00/1.log.
   [2021-11-11, 09:07:29 UTC] 
   
--------------------------------------------------------------------------------
   [2021-11-11, 09:07:29 UTC] Starting attempt 1 of 2
   [2021-11-11, 09:07:29 UTC] 
   
--------------------------------------------------------------------------------
   [2021-11-11, 09:07:29 UTC] Marking success for <Task(EmrAddStepsOperator): 
one.two.three.task> on 2021-11-11 02:00:00+00:00
   [2021-11-11, 09:07:29 UTC] Started process 22814 to run task
   [2021-11-11, 09:07:32 UTC] Running <TaskInstance: dag1.one.two.three.task 
backfill__2021-11-11T02:00:00+00:00 [running]> on host 
ip-REDACTED.eu-west-2.compute.internal
   [2021-11-11, 09:07:32 UTC] Marking task as SUCCESS. dag_id=dag1, 
task_id=one.two.three.task, execution_date=20211111T020000, 
start_date=20211111T090518, end_date=20211111T090732
   [2021-11-11, 09:07:34 UTC] State of this instance has been externally set to 
success. Terminating instance.
   [2021-11-11, 09:07:34 UTC] Sending Signals.SIGTERM to GPID 22814
   [2021-11-11, 09:07:34 UTC] Process psutil.Process(pid=22814, 
status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='09:07:29') 
(22814) terminated with exit code Negsignal.SIGTERM
   ```
   
   ### What you expected to happen
   
   Task should not hang in 'Running' state
   
   ### How to reproduce
   
   We have found that this issue occurs more often as the number of parallel 
tasks increases.
   
   1. Setup Airflow to run with the LocalExecutor with a parallelism of 32
   1. Create a DAG with 8+ parallel, long-running tasks 
   1. Run the DAG repeatedly until a task gets stuck in the 'Running' state
   
   ### Anything else
   
   Roughly one DAG run in 10 with 8 parallel tasks and 40+ pending tasks
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to