vijayakumarpattanashetti opened a new issue #17371: URL: https://github.com/apache/airflow/issues/17371
**Apache Airflow version**: 2.0.1 **Kubernetes version**: 1.20.7 **Driver**: Docker 20.10.7 **Environment**: Local minikube - **Cloud provider or hardware configuration**: 2 GHz Quad-Core Intel Core i5 Processor, 16GB RAM, 500GB SSD - **OS**: macOS Catalina Version 10.15.7 - **Kernel**: Darwin Kernel Version 19.6.0: Thu Oct 29 22:56:45 PDT 2020; root:xnu-6153.141.2.2~1/RELEASE_X86_64 x86_64 - **Install tools**: Airflow Helm chart (https://github.com/airflow-helm/charts/tree/main/charts/airflow) version: 8.3.2, Spark Operator Helm chart (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/charts/spark-operator-chart) version 1.1.3 **What happened**: When an existing SparkApplication YAML/JSON file path is supplied as the application_file field value, the code just loads the specified path (string) instead of the contents of the file and hence json parse error is thrown. <details><summary>worker.log</summary> *** Reading remote log from s3://airflow-logs/spark_pi_try/spark_pi_trial/2021-07-30T09:07:58.926987+00:00/1.log. [2021-07-30 09:08:06,170] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 [queued]> [2021-07-30 09:08:06,178] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 [queued]> [2021-07-30 09:08:06,178] {taskinstance.py:1042} INFO - -------------------------------------------------------------------------------- [2021-07-30 09:08:06,178] {taskinstance.py:1043} INFO - Starting attempt 1 of 1 [2021-07-30 09:08:06,178] {taskinstance.py:1044} INFO - -------------------------------------------------------------------------------- [2021-07-30 09:08:06,190] {taskinstance.py:1063} INFO - Executing <Task(SparkKubernetesOperator): spark_pi_trial> on 2021-07-30T09:07:58.926987+00:00 [2021-07-30 09:08:06,192] {standard_task_runner.py:52} INFO - Started process 26 to run task [2021-07-30 09:08:06,196] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'spark_pi_try', 'spark_pi_trial', '2021-07-30T09:07:58.926987+00:00', '--job-id', '3', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/spark-opr-dag2.py', '--cfg-path', '/tmp/tmpm_sedozj', '--error-file', '/tmp/tmp7gd166ej'] [2021-07-30 09:08:06,197] {standard_task_runner.py:77} INFO - Job 3: Subtask spark_pi_trial [2021-07-30 09:08:06,302] {logging_mixin.py:104} INFO - Running <TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 [running]> on host sparkpitrysparkpitrial.05ac566507de466a8c0cfe980bdc00d0 [2021-07-30 09:08:06,382] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 DeprecationWarning: The remote_logging option in [core] has been moved to the remote_logging option in [logging] - the old setting has been used, but please update your config. [2021-07-30 09:08:06,383] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 DeprecationWarning: The remote_log_conn_id option in [core] has been moved to the remote_log_conn_id option in [logging] - the old setting has been used, but please update your config. [2021-07-30 09:08:06,384] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 DeprecationWarning: The remote_base_log_folder option in [core] has been moved to the remote_base_log_folder option in [logging] - the old setting has been used, but please update your config. [2021-07-30 09:08:06,385] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 DeprecationWarning: The encrypt_s3_logs option in [core] has been moved to the encrypt_s3_logs option in [logging] - the old setting has been used, but please update your config. [2021-07-30 09:08:06,464] {taskinstance.py:1255} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=spark_pi_try AIRFLOW_CTX_TASK_ID=spark_pi_trial AIRFLOW_CTX_EXECUTION_DATE=2021-07-30T09:07:58.926987+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2021-07-30T09:07:58.926987+00:00 [2021-07-30 09:08:06,464] {spark_kubernetes.py:60} INFO - Creating sparkApplication [2021-07-30 09:08:06,495] {taskinstance.py:1455} ERROR - Exception when calling -> create_custom_object: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': '67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 GMT', 'Content-Length': '462'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the object provided is unrecognized (must be of type SparkApplication): couldn't get version/kind; json parse error: json: cannot unmarshal string into Go value of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind string \"json:\\\"kind,omitempty\\\"\" } (222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 ...)","reason":"BadRequest","code":400} Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 166, in create_custom_object response = api.create_namespaced_custom_object( File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object (data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs) # noqa: E501 File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info return self.api_client.call_api( File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api return self.__call_api(resource_path, method, File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api response_data = self.request( File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request return self.rest_client.POST(url, File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST return self.request("POST", url, File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request raise ApiException(http_resp=r) kubernetes.client.rest.ApiException: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': '67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 GMT', 'Content-Length': '462'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the object provided is unrecognized (must be of type SparkApplication): couldn't get version/kind; json parse error: json: cannot unmarshal string into Go value of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind string \"json:\\\"kind,omitempty\\\"\" } (222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 ...)","reason":"BadRequest","code":400} During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task result = task_copy.execute(context=context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 62, in execute response = hook.create_custom_object( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 172, in create_custom_object raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n") airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': '67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 GMT', 'Content-Length': '462'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the object provided is unrecognized (must be of type SparkApplication): couldn't get version/kind; json parse error: json: cannot unmarshal string into Go value of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind string \"json:\\\"kind,omitempty\\\"\" } (222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 ...)","reason":"BadRequest","code":400} [2021-07-30 09:08:06,498] {taskinstance.py:1496} INFO - Marking task as FAILED. dag_id=spark_pi_try, task_id=spark_pi_trial, execution_date=20210730T090758, start_date=20210730T090806, end_date=20210730T090806 [2021-07-30 09:08:06,530] {local_task_job.py:146} INFO - Task exited with return code 1 </details>  **What you expected to happen**: The YAML/JSON file contents to be loaded rather than the file path. **Suggestion**: This happens because open & read functions are not used to get the file contents before loading. Using open & read to get the contents before loading works. **How to reproduce it**: 1. Install and start minikube Prerequisite: Docker 20.10.7 ```brew install minikube && minikube start``` 2. Install helm ```brew install helm``` 3. Install the airflow and the spark operator using the helm charts. Navigate to the helm charts root directory, and edit to configure remote logging and run the below commands to install. ```helm install airflow . -n <namespace>``` ```helm install spark-operator . -n <namespace>``` 4. Create Kubernetes Cluster Connection with the name "kubenetes_default", and check "In cluster configuration" box. 5. Create a simple dag and provide the path to the SparkApplication specification yaml file. Find sample files here: https://github.com/apache/airflow/tree/main/airflow/providers/cncf/kubernetes/example_dags 6. Copy these dags into the dags location i.e., /opt/airflow/dags within the scheduler/driver pods. 4. Once the dag is visible on the Airflow WebUI, trigger the dag and check the logs. How often does this problem occur? Every time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
