Felix-neko opened a new issue, #35529:
URL: https://github.com/apache/airflow/issues/35529
### Apache Airflow version
2.7.3
### What happened
Hi folks!
I have to use `PythonVirtualenvOperator` operator and pass it `{{ dag_run
}}`, `{{ task_instance }}` and other airflow context variables. And sometimes
it crashes with following error:
```
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - Traceback (most
recent call last):
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - File
"/tmp/venvqbspm8nx/script.py", line 17, in <module>
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - arg_dict =
dill.load(file)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - File
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 373, in
load
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - return
Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - File
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 646, in
load
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - obj =
StockUnpickler.load(self)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - File
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 636, in
find_class
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - return
StockUnpickler.find_class(self, module, name)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -
ModuleNotFoundError: No module named
'unusual_prefix_7da5b81975a8caeba2f4e2b91b352e55493c2e25_dag'
[2023-11-08, 11:30:20 UTC] {taskinstance.py:1937} ERROR - Task failed with
exception
```
Afer some testing I have found out that this error occurs if there is any
operator in the DAG (maybe other operator than `PythonVirtualenvOperator`) that
takes a function as `python_callable` argument -- and whose function is defined
in the same Python source as the DAG object.
### What you think should happen instead
I think that `airflow` should check its DAGs befor running (or before
serialization) and give an informative error message in following case: if
there is a `PythonVirtualenvOperator` in the DAG and if there is a
`python_callable` function who is declared in the same Python module as the DAG
itself.
And, for the future, it will be really cool if airflow will migrate to
`cloudpickle` and such functions will be deserialized correctly.
### How to reproduce
Here's a minimal example that will give this error (should be tested with
`airflow standalone`, with `SequentialExecutor` or `KubernetesExecutor`, does
not happen on `DebugExecutor`):
```
import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill
dag = DAG(
dag_id='strange_pickling_error_dag',
schedule_interval='0 5 * * 1',
start_date=datetime.datetime(2020, 1, 1),
catchup=False,
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
use_dill=True,
system_site_packages=False,
op_args=[context],
requirements=[f"dill=={dill.__version__}",
f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
dag=dag)
```
And here's my workaround code:
Here's my code now:
- `dags/strange_pickling_error/dag.py`:
```
import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill
from strange_pickling_error.some_moar_code import make_foo
dag = DAG(
dag_id='strange_pickling_error_dag',
schedule_interval='0 5 * * 1',
start_date=datetime.datetime(2020, 1, 1),
catchup=False,
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
use_dill=True,
system_site_packages=False,
op_args=[context],
requirements=[f"dill=={dill.__version__}",
f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
dag=dag)
```
- `dags/strange_pickling_error/some_moar_code.py`:
```
def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)
```
### Operating System
Ubuntu 22.04
### Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-google==10.11.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0
### Deployment
Virtualenv installation
### Deployment details
Python 3.10
airflow==2.7.3
dill==0.3.5.1
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]