BenediktRamsauer opened a new issue, #40585: URL: https://github.com/apache/airflow/issues/40585
### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers 8.3.2 ### Apache Airflow version 2.9.2 ### Operating System rhel ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened Having specific `typing` elements that need to be imported generate an error when using `@task.kubernetes` task decorator. See below for the specific of the errors. ### What you think should happen instead A solution could be to adjust the `get_python_source` function here : https://github.com/apache/airflow/blob/main/airflow/decorators/base.py#L301. We could add a step to remove the typing from the code (for the parameters part but also, if wished, in the code). I have a solve ready using `ast` and `astunparse`. ### How to reproduce ``` from typing import Any from airflow.decorators import dag, task class MyObject: var: str = 'foo' POD_CONFIG = dict( image='apache/airflow:slim-latest-python3.11', image_pull_policy="Always", namespace=None, get_logs=True, log_events_on_failure=True, do_xcom_push=True, ) @dag( schedule=None, ) def kpo_bug_test(): @task.kubernetes(**POD_CONFIG) def my_task0(var='foo'): pass @task.kubernetes(**POD_CONFIG) def my_task1(var: Any ='foo'): print(var) @task.kubernetes(**POD_CONFIG) def my_task2(var: MyObject=MyObject()) : pass @task.kubernetes(**POD_CONFIG) def my_task3(): var: Any = 'foo' print(var) my_task0() my_task1() my_task2() my_task3() kpo_bug_test() ``` I obtain the following results:  - `my_task0` is supposed to run as proof that everything is fine - `my_task1` is supposed to fail because the `from typing import Any` is not added in the `/tmp/scripts.py` file. The interesting log portion: > [2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] + python /tmp/script.py /tmp/script.in /airflow/xcom/return.json [2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] Traceback (most recent call last): [2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] File "/tmp/script.py", line 19, in <module> [2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] def my_task1(var: Any ='foo'): [2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] ^^^ [2024-07-03, 11:48:09 UTC] {pod_manager.py:490} INFO - [base] NameError: name 'Any' is not defined. Did you mean: 'any'? - `my_task2` is also supposed to fail, because the definition of the `MyObject` is not ported to `/tmp/scripts.py` and may never be since the created pod migt not contain the class definition. >[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] + python /tmp/script.py /tmp/script.in /airflow/xcom/return.json [2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] Traceback (most recent call last): [2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] File "/tmp/script.py", line 19, in <module> [2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] def my_task2(var: MyObject=MyObject()) : [2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] ^^^^^^^^ [2024-07-03, 11:48:20 UTC] {pod_manager.py:490} INFO - [base] NameError: name 'MyObject' is not defined. Did you mean: 'object'? - For `my_task3`, I was expecting a failure since `from typing import Any` is not in the `/tmp/script.py` file (as for `my_task2`). But it appears that is does not raise errors. Thus we do not need to solve this none existing problem. ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
