martinbikandi commented on issue #56295: URL: https://github.com/apache/airflow/issues/56295#issuecomment-3382097555
Hello, I've had some time to look into this. The context for the PythonVirtualEnvOperator is determined in the function [`get_template_context` of the taskinstance.py file (line 952).](https://github.com/apache/airflow/blob/v2-11-stable/airflow/models/taskinstance.py). In this definition, the following keys are added to the context: - dag: <DAG: dag_test> - dag_run: <DagRun dag_test @ ... > - macros: <module 'airflow.macros' from '/env_directory/site-packages/airflow/macros/__init__.py'> - task: <Task(PythonVirtualenvOperator): run_kwarg> These 4 keys are included in `AIRFLOW_SERIALIZABLE_CONTEXT_KEYS` of [`_BasePythonVirtualenvOperator` (line 420)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/operators/python.py). The context is filtered before execution in the execute method of the `_BasePythonVirtualenvOperator`, serializable keys are obtained via `_iter_serializable_context_keys`, defined in line 885. If airflow is included in the requirements or if system_site_packages is set to True, then keys from `AIRFLOW_SERIALIZABLE_CONTEXT_KEYS` are kept in the context. The new context is created with the [`context_copy_partial` function in `context.py` (line 486)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/utils/context.py). In the execute method of the base `PythonOperator` the `context` is updated with the `op_kwargs` passed to the task definition, then the required `op_kwargs` for the callable are determined with [`determine_kwargs` of `operator_helpers.py` (line 153)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/utils/operator_helpers.py). In `determine_kwargs`, if wildcard `**kwargs` is present, then everything in the context is returned. The error finally arises when running `execute_callable` -> `_execute_python_callable_in_subprocess` -> `_write_args`, when none of the 4 context keys listed above are serializable by the default `pickle` serializer. I tried using dill and cloudpickle as serializers, and with a single task the DAG works: ```python from airflow import models from airflow.operators.python import PythonVirtualenvOperator from airflow.datasets import Dataset def run_kwarg(**kwargs): print(kwargs) with models.DAG("dag_test", schedule=None, catchup=False) as dag: task_run_kwarg = PythonVirtualenvOperator( task_id="run_kwarg", python_callable=run_kwarg, op_kwargs={'context': "{{dag_run.conf}}"}, system_site_packages=True, requirements=["dill"], serializer='dill' ) ``` Using cloudpickle as serializer works too. When I add another task, which uses `system_site_packages=True`, the DAG fails: ```python from airflow import models from airflow.operators.python import PythonVirtualenvOperator from airflow.datasets import Dataset def run_kwarg(**kwargs): print(kwargs) def run_arg(context): print(context) with models.DAG("dag_test2", schedule=None, catchup=False) as dag: # fails task_run_kwarg = PythonVirtualenvOperator( task_id="run_kwarg", python_callable=run_kwarg, op_kwargs={'context': "{{dag_run.conf}}"}, system_site_packages=True, serializer='dill', requirements=['dill'] ) # works task_run_arg = PythonVirtualenvOperator( task_id="run_arg", python_callable=run_arg, op_kwargs={'context': "{{dag_run.conf}}"}, system_site_packages=True ) ``` The new error in `task_run_kwarg` is `AttributeError: Can't get attribute 'run_arg'` while unpickling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
