vchiapaikeo opened a new issue, #28933:
URL: https://github.com/apache/airflow/issues/28933
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes 5.0.0
### Apache Airflow version
2.5.0
### Operating System
Debian 11
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### What happened
When passing arguments (either args or kwargs) to a @task.kubernetes
decorated function, the following exception occurs:
Task Logs:
```
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:621} INFO - Building pod
k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 with labels:
{'dag_id': 'test_k8s_input_1673647477', 'task_id': 'k8s_with_input', 'run_id':
'backfill__2023-01-01T0000000000-c16e0472d', 'kubernetes_pod_operator': 'True',
'try_number': '1'}
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:404} INFO - Found matching pod
k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 with labels
{'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.5.0', 'dag_id':
'test_k8s_input_1673647477', 'kubernetes_pod_operator': 'True', 'run_id':
'backfill__2023-01-01T0000000000-c16e0472d', 'task_id': 'k8s_with_input',
'try_number': '1'}
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:405} INFO - `try_number` of
task_instance: 1
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:406} INFO - `try_number` of
pod: 1
[2023-01-13, 22:05:40 UTC] {pod_manager.py:189} WARNING - Pod not yet
started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:41 UTC] {pod_manager.py:189} WARNING - Pod not yet
started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:42 UTC] {pod_manager.py:189} WARNING - Pod not yet
started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:43 UTC] {pod_manager.py:189} WARNING - Pod not yet
started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - + python -c 'import
base64, os;x = os.environ["__PYTHON_SCRIPT"];f = open("/tmp/script.py", "w");
f.write(x); f.close()'
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - + python
/tmp/script.py
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - Traceback (most
recent call last):
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - File
"/tmp/script.py", line 14, in <module>
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - with
open(sys.argv[1], "rb") as file:
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - IndexError: list
index out of range
[2023-01-13, 22:05:44 UTC] {kubernetes_pod.py:499} INFO - Deleting pod:
k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:44 UTC] {taskinstance.py:1772} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py",
line 104, in execute
return super().execute(context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py",
line 217, in execute
return_value = super().execute(context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 465, in execute
self.cleanup(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 489, in cleanup
raise AirflowException(
airflow.exceptions.AirflowException: Pod
k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 returned a
failure:
```
### What you think should happen instead
K8's decorator should properly receive input. The [python command invoked
here](https://github.com/apache/airflow/blob/2.5.0/airflow/providers/cncf/kubernetes/decorators/kubernetes.py#L75)
does not pass input. Contrast this with the [docker version of the
decorator](https://github.com/apache/airflow/blob/2.5.0/airflow/providers/docker/decorators/docker.py#L105)
which does properly pass pickled input.
### How to reproduce
Create a dag:
```py
import os
from airflow import DAG
from airflow.decorators import task
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"start_date": "2022-12-16",
"retries": 0,
}
@task.kubernetes(
image="python:3.8-slim-buster",
namespace=os.getenv("AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE"),
in_cluster=False,
)
def k8s_with_input(val: str) -> str:
import datetime
print(f"Got val: {val}")
return val
with DAG(
schedule_interval="@daily",
max_active_runs=1,
max_active_tasks=5,
catchup=False,
dag_id="test_oom_dag",
default_args=DEFAULT_TASK_ARGS,
) as dag:
output = k8s_with_input.override(task_id="k8s_with_input")("a")
```
Run and observe failure:
<img width="907" alt="image"
src="https://user-images.githubusercontent.com/9200263/212427952-15466317-4e61-4b71-9971-2cdedba4f7ba.png">
Task logs above.
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]