GitHub user ankurbansal-tradedoubler edited a discussion: Getting stuck with 
Airflow Flink K8s job submission failure.

Hi,
I am getting error when I am trying to pass flinkdeployment configuration as a 
seperate yaml file path. but everything works file if I pass the yaml config as 
a string to FlinkKubernetesOperator's application_file parameter.
Flink version - 1.19.1
Airflow version -  2.9.3
K8s version  - 1.29.1
Below is the error snippet.

```
HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the
 object provided is unrecognized (must be of type FlinkDeployment): couldn't 
get version/kind; json parse error: json: cannot unmarshal string into Go value 
of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind 
string \"json:\\\"kind,omitempty\\\"\" } 
(222f6b38732d6a6f62732f666c696e6b2f666c696e6b2d62617369632e79 
...)","reason":"BadRequest","code":400} |  
-- | --
  |   | 2024-11-04 16:01:40.497 | HTTP response headers: 
HTTPHeaderDict({'Audit-Id': 'de6ea460-9da8-4166-8164-0b64210a1ced', 
'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 
'X-Kubernetes-Pf-Flowschema-Uid': 'bec64d75-602f-463b-a128-12129b97129b', 
'X-Kubernetes-Pf-Prioritylevel-Uid': '5530462b-a13b-471c-957c-157fff1ae78b', 
'Date': 'Mon, 04 Nov 2024 15:01:40 GMT', 'Content-Length': '461'}) |  
  |   | 2024-11-04 16:01:40.497 | Reason: Bad Request |  
  |   | 2024-11-04 16:01:40.497 | kubernetes.client.exceptions.ApiException: 
(400) |  
  |   | 2024-11-04 16:01:40.497 | raise ApiException(http_resp=r) |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 238, in request |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return self.request("POST", url, |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 279, in POST |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return self.rest_client.POST(url, |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 391, in request |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | response_data = self.request( |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 180, in __call_api |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return self.__call_api(resource_path, method, 
|  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 348, in call_api |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return self.api_client.call_api( |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/custom_objects_api.py",
 line 354, in create_namespaced_custom_object_with_http_info |  
  |   | 2024-11-04 16:01:40.497 | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 |  
  |   | 2024-11-04 16:01:40.497 | return 
self.create_namespaced_custom_object_with_http_info(group, version, namespace, 
plural, body, **kwargs)  # noqa: E501 |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/custom_objects_api.py",
 line 231, in create_namespaced_custom_object |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | response = 
api.create_namespaced_custom_object( |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 328, in create_custom_object |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | response = self.hook.create_custom_object( |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/operators/flink_kubernetes.py",
 line 110, in execute |  
  |   | 2024-11-04 16:01:40.497 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return func(self, *args, **kwargs) |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 401, in wrapper |  
  |   | 2024-11-04 16:01:40.497 | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | return execute_callable(context=context, 
**execute_callable_kwargs) |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 432, in _execute_callable |  
  |   | 2024-11-04 16:01:40.497 | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2024-11-04 16:01:40.497 | result = _execute_callable(context=context, 
**execute_callable_kwargs) |  
  |   | 2024-11-04 16:01:40.497 | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 465, in _execute_task |  
  |   | 2024-11-04 16:01:40.497 | Traceback (most recent call last): |  
  |   | 2024-11-04 16:01:40.497 | [2024-11-04T15:01:40.495+0000] 
{taskinstance.py:2905} ERROR - Task failed with exception |  
  |   | 2024-11-04 16:01:40.495 | [2024-11-04T15:01:40.495+0000] 
{taskinstance.py:441} INFO - ::group::Post task execution logs |  
  |   | 2024-11-04 16:01:40.493 | [2024-11-04T15:01:40.493+0000] 
{flink_kubernetes.py:109} INFO - body=self.application_file: 
/k8s-jobs/flink/flink-basic.yaml
```


Update : I was suspecting that it might be a case of not being able to read the 
file content so I tried passing the application file as string input after 
reading it as yaml file from the same file path. Everything works fine here as 
well. 
DO you know if there is any limitations with FlinkKubernetesOperator that 
application_file parameters can not parse a file?

Any help or guidance is appreciated.



GitHub link: https://github.com/apache/airflow/discussions/43639

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to