dangal95 opened a new issue #15252:
URL: https://github.com/apache/airflow/issues/15252


   **Apache Airflow version**: 2.0.1
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): 1.20.4
   
   **Environment**: 
   AWS EKS
   Pytho 3.6.12
   
   - **Cloud provider or hardware configuration**:  AWS
   - **OS** (e.g. from /etc/os-release): amazon-linux
   - **Others**: Python 3.6.12
   
   **What happened**:
   I am trying to launch spark jobs on kubernetes by using the 
SparkKubernetesOperator and SparkKubernetesSensor available through the 
airflow.providers package. More specifically, I am using these two operators:
   1) 
http://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_modules/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.html
   2) 
http://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes/index.html
   
   I followed this example and it worked:  
   
https://github.com/apache/airflow/blob/master/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py
   
   However, I would like to add other tasks after the spark job has  been 
executed successfully, and I added a python operator to execute a custom 
function that I have. The function works on its own so the problem is not 
coming from there. Instead, what  happens is that the SparkKubernetesSensor 
throws this exception 
   
   [2021-04-07 14:33:17,881] {debug_executor.py:87} ERROR - Failed to execute 
task: can't pickle _thread.RLock objects.
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/debug_executor.py",
 line 79, in _run_task
       ti._run_raw_task(job_id=ti.job_id, **params)  # pylint: 
disable=protected-access
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", 
line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
 line 1176, in _run_raw_task
       self._run_mini_scheduler_on_child_tasks(session)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
 line 1202, in _run_mini_scheduler_on_child_tasks
       include_direct_upstream=True,
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py", line 
1476, in partial_subset
       for t in matched_tasks + also_include
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py", line 
1476, in <dictcomp>
       for t in matched_tasks + also_include
     File "/usr/local/lib/python3.6/copy.py", line 161, in deepcopy
       y = copier(memo)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py",
 line 832, in __deepcopy__
       setattr(result, k, copy.deepcopy(v, memo))  # noqa
     File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
       y = _reconstruct(x, memo, *rv)
     File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
       state = deepcopy(state, memo)
     File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
       y = _reconstruct(x, memo, *rv)
     File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
       state = deepcopy(state, memo)
     File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
       y = _reconstruct(x, memo, *rv)
     File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
       state = deepcopy(state, memo)
     File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.6/copy.py", line 215, in _deepcopy_list
       append(deepcopy(a, memo))
     File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
       y = _reconstruct(x, memo, *rv)
     File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
       state = deepcopy(state, memo)
     File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.6/copy.py", line 169, in deepcopy
       rv = reductor(4)
   TypeError: can't pickle _thread.RLock objects
   
   
   **What you expected to happen**:
   
   I can see that the spark driver pod completed successfully so I would expect 
the SparkKubernetesSensor  to finish successfully and Airflow  moves on to the 
next task
   
   <!-- What do you think went wrong? -->
   I am not sure what the problem is. As I said, this only occurs when there is 
an upstream task. If the sensor is the last step in the DAG then this error 
does not occur.
   
   **How to reproduce it**:
   Assuming you have a spark job  operator  task called "t1" and a spark sensor 
called "t2", then create any other airflow task ("t3") and set it to run after 
the sensor. So inside your DAG: t1 >> t2 >> t3
   As minimally and precisely as possible. Keep in mind we do not have access 
to your cluster or dags.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to