martinbikandi commented on issue #56295:
URL: https://github.com/apache/airflow/issues/56295#issuecomment-3382097555

   Hello, I've had some time to look into this.
   
   The context for the PythonVirtualEnvOperator is determined in the function 
[`get_template_context` of the taskinstance.py file (line 
952).](https://github.com/apache/airflow/blob/v2-11-stable/airflow/models/taskinstance.py).
 In this definition, the following keys are added to the context:
   
   - dag: <DAG: dag_test>
   - dag_run: <DagRun dag_test @ ... >
   - macros: <module 'airflow.macros' from 
'/env_directory/site-packages/airflow/macros/__init__.py'>
   - task: <Task(PythonVirtualenvOperator): run_kwarg>
   
   These 4 keys are included in `AIRFLOW_SERIALIZABLE_CONTEXT_KEYS` of 
[`_BasePythonVirtualenvOperator` (line 
420)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/operators/python.py).
   
   The context is filtered before execution in the execute method of the 
`_BasePythonVirtualenvOperator`, serializable keys are obtained via 
`_iter_serializable_context_keys`, defined in line 885.
   
   If airflow is included in the requirements or if system_site_packages is set 
to True, then keys from `AIRFLOW_SERIALIZABLE_CONTEXT_KEYS` are kept in the 
context. The new context is created with the [`context_copy_partial` function 
in `context.py` (line 
486)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/utils/context.py).
   
   In the execute method of the base `PythonOperator` the `context` is updated 
with the `op_kwargs` passed to the task definition, then the required 
`op_kwargs` for the callable are determined with [`determine_kwargs` of 
`operator_helpers.py` (line 
153)](https://github.com/apache/airflow/blob/v2-11-stable/airflow/utils/operator_helpers.py).
   
   In `determine_kwargs`, if wildcard `**kwargs` is present, then everything in 
the context is returned.
   
   The error finally arises when running `execute_callable` -> 
`_execute_python_callable_in_subprocess` -> `_write_args`, when none of the 4 
context keys listed above are serializable by the default `pickle` serializer.
   
   I tried using dill and cloudpickle as serializers, and with a single task 
the DAG works:
   
   ```python
   from airflow import models
   from airflow.operators.python import PythonVirtualenvOperator
   
   from airflow.datasets import Dataset
   
   def run_kwarg(**kwargs):
       print(kwargs)
   
   with models.DAG("dag_test",
                   schedule=None,
                   catchup=False) as dag:
   
       task_run_kwarg = PythonVirtualenvOperator(
           task_id="run_kwarg",
           python_callable=run_kwarg,
           op_kwargs={'context': "{{dag_run.conf}}"},
           system_site_packages=True,
           requirements=["dill"],
           serializer='dill'
       )
   ```
   
   Using cloudpickle as serializer works too.
   
   When I add another task, which uses `system_site_packages=True`, the DAG 
fails:
   
   ```python
   from airflow import models
   from airflow.operators.python import PythonVirtualenvOperator
   
   from airflow.datasets import Dataset
   
   def run_kwarg(**kwargs):
       print(kwargs)
   
   def run_arg(context):
       print(context)
   
   with models.DAG("dag_test2",
                   schedule=None,
                   catchup=False) as dag:
   
       # fails
       task_run_kwarg = PythonVirtualenvOperator(
           task_id="run_kwarg",
           python_callable=run_kwarg,
           op_kwargs={'context': "{{dag_run.conf}}"},
           system_site_packages=True,
           serializer='dill',
           requirements=['dill']
       )
   
       # works
       task_run_arg = PythonVirtualenvOperator(
           task_id="run_arg",
           python_callable=run_arg,
           op_kwargs={'context': "{{dag_run.conf}}"},
           system_site_packages=True
       )
   ```
   
   The new error in `task_run_kwarg` is `AttributeError: Can't get attribute 
'run_arg'` while unpickling.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to