vumdao opened a new issue #18183:
URL: https://github.com/apache/airflow/issues/18183


   ### Apache Airflow version
   
   2.1.3 (latest released)
   
   ### Operating System
   
   Kubernetes version: eks 1.18 - using Celery executor
   
   ### Versions of Apache Airflow Providers
   
   apache/airflow:2.1.3-python3.9
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Apache Airflow version: 2.1.3
   Python version: 3.9
   
   Kubernetes version: AWS EKS 1.18
   
   Environment: Kubernetes cluster using Celery executor
   
   ```
       AIRFLOW__CORE__PARALLELISM: 100
       AIRFLOW__CELERY__WORKER_CONCURRENCY: 100
       AIRFLOW__CORE__DAG_CONCURRENCY: 100
   ```
   
   ### What happened
   
   Running a DAG with 3 subDags, each subDag runs multiple tasks in parallel
   
   error messages:
   ```
   [2021-09-11 16:03:49,894: WARNING/ForkPoolWorker-1] Running <TaskInstance: 
jk-test.cloudopz.co_run_all_reports 2021-09-11T15:50:12.460553+00:00 [queued]> 
on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local               
                                                                                
                                    
   [2021-09-11 16:03:52,231: ERROR/ForkPoolWorker-1] Failed to execute task 
cannot pickle '_thread.lock' object.                                            
                                                                                
                                                                                
                                                                    
   Traceback (most recent call last):                                           
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py",
 line 117, in _execute_in_fork                                                  
                                                                                
                                                              
       args.func(args)                                                          
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 48, in command                                                             
                                                                                
                                                                        
       return func(*args, **kwargs)                                             
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
91, in wrapper                                                           
       return f(*args, **kwargs)                                                
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 238, in task_run                                                          
                                                                                
                                                              
       _run_task_by_selected_method(args, dag, ti)                              
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 64, in _run_task_by_selected_method                                       
                                                                                
                                                              
       _run_task_by_local_task_job(args, ti)                                    
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 121, in _run_task_by_local_task_job                                       
                                                                                
                                                              
       run_job.run()                                                            
                                                                                
 
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 245, in run                                                          
       self._execute()                                                          
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py",
 line 128, in _execute                                                          
                                                                                
                                                                    
       self.handle_task_exit(return_code)                                       
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py",
 line 166, in handle_task_exit                                                  
                                                                                
                                                                                
                                                    
       self._run_mini_scheduler_on_child_tasks()                                
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 70, in wrapper                                                       
       return func(*args, session=session, **kwargs)                            
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py",
 line 241, in _run_mini_scheduler_on_child_tasks                                
                                                                                
                                                                                
                                                    
       partial_dag = task.dag.partial_subset(                                   
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
1487, in partial_subset                                                         
                                                                                
                                                                                
                                                       
       dag.task_dict = {                                                        
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
1488, in <dictcomp>                                                             
                                                                                
                                                                                
                                                       
       t.task_id: copy.deepcopy(t, {id(t.dag): dag})  # type: ignore            
                                                                                
                                 
     File "/usr/local/lib/python3.9/copy.py", line 153, in deepcopy             
                                                                                
 
       y = copier(memo)                                                         
  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py",
 line 970, in __deepcopy__                                                      
                                                                                
                                                                                
                                                    
       setattr(result, k, copy.deepcopy(v, memo))                               
  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
 
       y = copier(x, memo)                                                      
  
     File "/usr/local/lib/python3.9/copy.py", line 205, in _deepcopy_list       
                                                                                
 
       append(deepcopy(a, memo))                                                
  
     File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy             
                                                                                
 
       y = _reconstruct(x, memo, *rv)                                           
  
     File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct         
                                                                                
 
       state = deepcopy(state, memo)                                            
  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
 
       y = copier(x, memo)                                                      
  
     File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict       
                                                                                
 
       y[deepcopy(key, memo)] = deepcopy(value, memo)                           
  
     File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy             
                                                                                
 
       y = _reconstruct(x, memo, *rv)                                           
  
     File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct         
                                                                                
 
       state = deepcopy(state, memo)                                            
  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
 
       y = copier(x, memo)                                                      
  
     File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict       
                                                                                
 
       y[deepcopy(key, memo)] = deepcopy(value, memo)                           
  
     File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy             
                                                                                
 
       y = _reconstruct(x, memo, *rv)                                           
  
     File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct         
                                                                                
 
       state = deepcopy(state, memo)                                            
  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
 
       y = copier(x, memo)                                                      
  
     File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict       
                                                                                
 
       y[deepcopy(key, memo)] = deepcopy(value, memo)                           
  
     File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy             
                                                                                
 
       y = _reconstruct(x, memo, *rv)                                           
  
     File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct         
                                                                                
 
       state = deepcopy(state, memo)                                            
  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
                                 
       y = copier(x, memo)                                                      
                                                                                
                                                                                
                                                                                
     File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict       
                                                                                
                                 
       y[deepcopy(key, memo)] = deepcopy(value, memo)                           
                                                                                
 
     File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy             
                                                                                
                                 
       y = _reconstruct(x, memo, *rv)                                           
                                                                                
                                                                                
                                                                                
     File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct         
                                                                                
                                 
       state = deepcopy(state, memo)                                            
                  
     File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy             
                                                                                
                                 
       y = copier(x, memo)                                                      
                  
     File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict       
                                                                                
                                 
       y[deepcopy(key, memo)] = deepcopy(value, memo)                           
                  
     File "/usr/local/lib/python3.9/copy.py", line 161, in deepcopy             
                                                                                
                                 
       rv = reductor(4)                                                         
                  
   TypeError: cannot pickle '_thread.lock' object             
   [2021-09-11 16:03:52,269: ERROR/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[865ede47-39a7-4cc5-b467-2eaea74c7e57]
 raised unexpected: AirflowException('Celery command failed on host: 
airflow-worker-1.airflow-worker.airflow.svc.cluster.local')                     
                                                                                
          
   Traceback (most recent call last):                                           
                  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/celery/app/trace.py", line 
412, in trace_task                                                              
                          
       R = retval = fun(*args, **kwargs)                                        
                  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/celery/app/trace.py", line 
704, in __protected_call__                                                      
                                                                                
                                                                                
                                                         
       return self.run(*args, **kwargs)                                         
                  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py",
 line 88, in execute_command                                                    
                                                                                
                                                                                
                                              
       _execute_in_fork(command_to_exec)                                        
                  
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py",
 line 99, in _execute_in_fork                                                   
                                                                                
                                                                                
                                              
       raise AirflowException('Celery command failed on host: ' + 
get_hostname())                                                                 
                                               
   airflow.exceptions.AirflowException: Celery command failed on host: 
airflow-worker-1.airflow-worker.airflow.svc.cluster.local     
   ```
   
   ### What you expected to happen
   
   What you expected to happen: No error.
   
   ### How to reproduce
   
   Any time trigger the DAG, some tasks got success, some failed
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to