dangal95 opened a new issue #15252: URL: https://github.com/apache/airflow/issues/15252
**Apache Airflow version**: 2.0.1 **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): 1.20.4 **Environment**: AWS EKS Pytho 3.6.12 - **Cloud provider or hardware configuration**: AWS - **OS** (e.g. from /etc/os-release): amazon-linux - **Others**: Python 3.6.12 **What happened**: I am trying to launch spark jobs on kubernetes by using the SparkKubernetesOperator and SparkKubernetesSensor available through the airflow.providers package. More specifically, I am using these two operators: 1) http://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_modules/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.html 2) http://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes/index.html I followed this example and it worked: https://github.com/apache/airflow/blob/master/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py However, I would like to add other tasks after the spark job has been executed successfully, and I added a python operator to execute a custom function that I have. The function works on its own so the problem is not coming from there. Instead, what happens is that the SparkKubernetesSensor throws this exception [2021-04-07 14:33:17,881] {debug_executor.py:87} ERROR - Failed to execute task: can't pickle _thread.RLock objects. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task ti._run_raw_task(job_id=ti.job_id, **params) # pylint: disable=protected-access File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1176, in _run_raw_task self._run_mini_scheduler_on_child_tasks(session) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1202, in _run_mini_scheduler_on_child_tasks include_direct_upstream=True, File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py", line 1476, in partial_subset for t in matched_tasks + also_include File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py", line 1476, in <dictcomp> for t in matched_tasks + also_include File "/usr/local/lib/python3.6/copy.py", line 161, in deepcopy y = copier(memo) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 832, in __deepcopy__ setattr(result, k, copy.deepcopy(v, memo)) # noqa File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.6/copy.py", line 215, in _deepcopy_list append(deepcopy(a, memo)) File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.6/copy.py", line 169, in deepcopy rv = reductor(4) TypeError: can't pickle _thread.RLock objects **What you expected to happen**: I can see that the spark driver pod completed successfully so I would expect the SparkKubernetesSensor to finish successfully and Airflow moves on to the next task <!-- What do you think went wrong? --> I am not sure what the problem is. As I said, this only occurs when there is an upstream task. If the sensor is the last step in the DAG then this error does not occur. **How to reproduce it**: Assuming you have a spark job operator task called "t1" and a spark sensor called "t2", then create any other airflow task ("t3") and set it to run after the sensor. So inside your DAG: t1 >> t2 >> t3 As minimally and precisely as possible. Keep in mind we do not have access to your cluster or dags. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
