vijayakumarpattanashetti opened a new issue #17371:
URL: https://github.com/apache/airflow/issues/17371


   **Apache Airflow version**: 2.0.1
   
   **Kubernetes version**: 1.20.7
   
   **Driver**: Docker 20.10.7
   
   **Environment**: Local minikube
   
   - **Cloud provider or hardware configuration**: 2 GHz Quad-Core Intel Core 
i5 Processor, 16GB RAM, 500GB SSD
   - **OS**: macOS Catalina Version 10.15.7
   - **Kernel**: Darwin Kernel Version 19.6.0: Thu Oct 29 22:56:45 PDT 2020; 
root:xnu-6153.141.2.2~1/RELEASE_X86_64 x86_64
   - **Install tools**: Airflow Helm chart 
(https://github.com/airflow-helm/charts/tree/main/charts/airflow) version: 
8.3.2, Spark Operator Helm chart 
(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/charts/spark-operator-chart)
 version 1.1.3
   
   **What happened**:
   
   When an existing SparkApplication YAML/JSON file path is supplied as the 
application_file field value, the code just loads the specified path (string) 
instead of the contents of the file and hence json parse error is thrown.
   
   <details><summary>worker.log</summary> *** Reading remote log from 
s3://airflow-logs/spark_pi_try/spark_pi_trial/2021-07-30T09:07:58.926987+00:00/1.log.
   [2021-07-30 09:08:06,170] {taskinstance.py:851} INFO - Dependencies all met 
for <TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 
[queued]>
   [2021-07-30 09:08:06,178] {taskinstance.py:851} INFO - Dependencies all met 
for <TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 
[queued]>
   [2021-07-30 09:08:06,178] {taskinstance.py:1042} INFO - 
   
--------------------------------------------------------------------------------
   [2021-07-30 09:08:06,178] {taskinstance.py:1043} INFO - Starting attempt 1 
of 1
   [2021-07-30 09:08:06,178] {taskinstance.py:1044} INFO - 
   
--------------------------------------------------------------------------------
   [2021-07-30 09:08:06,190] {taskinstance.py:1063} INFO - Executing 
<Task(SparkKubernetesOperator): spark_pi_trial> on 
2021-07-30T09:07:58.926987+00:00
   [2021-07-30 09:08:06,192] {standard_task_runner.py:52} INFO - Started 
process 26 to run task
   [2021-07-30 09:08:06,196] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'spark_pi_try', 'spark_pi_trial', 
'2021-07-30T09:07:58.926987+00:00', '--job-id', '3', '--pool', 'default_pool', 
'--raw', '--subdir', 'DAGS_FOLDER/spark-opr-dag2.py', '--cfg-path', 
'/tmp/tmpm_sedozj', '--error-file', '/tmp/tmp7gd166ej']
   [2021-07-30 09:08:06,197] {standard_task_runner.py:77} INFO - Job 3: Subtask 
spark_pi_trial
   [2021-07-30 09:08:06,302] {logging_mixin.py:104} INFO - Running 
<TaskInstance: spark_pi_try.spark_pi_trial 2021-07-30T09:07:58.926987+00:00 
[running]> on host sparkpitrysparkpitrial.05ac566507de466a8c0cfe980bdc00d0
   [2021-07-30 09:08:06,382] {logging_mixin.py:104} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 
DeprecationWarning: The remote_logging option in [core] has been moved to the 
remote_logging option in [logging] - the old setting has been used, but please 
update your config.
   [2021-07-30 09:08:06,383] {logging_mixin.py:104} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 
DeprecationWarning: The remote_log_conn_id option in [core] has been moved to 
the remote_log_conn_id option in [logging] - the old setting has been used, but 
please update your config.
   [2021-07-30 09:08:06,384] {logging_mixin.py:104} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 
DeprecationWarning: The remote_base_log_folder option in [core] has been moved 
to the remote_base_log_folder option in [logging] - the old setting has been 
used, but please update your config.
   [2021-07-30 09:08:06,385] {logging_mixin.py:104} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:332 
DeprecationWarning: The encrypt_s3_logs option in [core] has been moved to the 
encrypt_s3_logs option in [logging] - the old setting has been used, but please 
update your config.
   [2021-07-30 09:08:06,464] {taskinstance.py:1255} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=spark_pi_try
   AIRFLOW_CTX_TASK_ID=spark_pi_trial
   AIRFLOW_CTX_EXECUTION_DATE=2021-07-30T09:07:58.926987+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2021-07-30T09:07:58.926987+00:00
   [2021-07-30 09:08:06,464] {spark_kubernetes.py:60} INFO - Creating 
sparkApplication
   [2021-07-30 09:08:06,495] {taskinstance.py:1455} ERROR - Exception when 
calling -> create_custom_object: (400)
   Reason: Bad Request
   HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 
GMT', 'Content-Length': '462'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the
 object provided is unrecognized (must be of type SparkApplication): couldn't 
get version/kind; json parse error: json: cannot unmarshal string into Go value 
of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind 
string \"json:\\\"kind,omitempty\\\"\" } 
(222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 
...)","reason":"BadRequest","code":400}
   
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 166, in create_custom_object
       response = api.create_namespaced_custom_object(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",
 line 183, in create_namespaced_custom_object
       (data) = self.create_namespaced_custom_object_with_http_info(group, 
version, namespace, plural, body, **kwargs)  # noqa: E501
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",
 line 275, in create_namespaced_custom_object_with_http_info
       return self.api_client.call_api(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 340, in call_api
       return self.__call_api(resource_path, method,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 172, in __call_api
       response_data = self.request(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 382, in request
       return self.rest_client.POST(url,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", 
line 272, in POST
       return self.request("POST", url,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", 
line 231, in request
       raise ApiException(http_resp=r)
   kubernetes.client.rest.ApiException: (400)
   Reason: Bad Request
   HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 
GMT', 'Content-Length': '462'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the
 object provided is unrecognized (must be of type SparkApplication): couldn't 
get version/kind; json parse error: json: cannot unmarshal string into Go value 
of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind 
string \"json:\\\"kind,omitempty\\\"\" } 
(222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 
...)","reason":"BadRequest","code":400}
   
   
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
 line 62, in execute
       response = hook.create_custom_object(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 172, in create_custom_object
       raise AirflowException(f"Exception when calling -> create_custom_object: 
{e}\n")
   airflow.exceptions.AirflowException: Exception when calling -> 
create_custom_object: (400)
   Reason: Bad Request
   HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'efbb7e0c-9754-45e5-ba08-360475af8161', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'67d4be08-a335-4cf1-a9cd-74d050906822', 'Date': 'Fri, 30 Jul 2021 09:08:06 
GMT', 'Content-Length': '462'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the
 object provided is unrecognized (must be of type SparkApplication): couldn't 
get version/kind; json parse error: json: cannot unmarshal string into Go value 
of type struct { APIVersion string \"json:\\\"apiVersion,omitempty\\\"\"; Kind 
string \"json:\\\"kind,omitempty\\\"\" } 
(222f6f70742f616972666c6f772f646167732f737061726b2d70692d6672 
...)","reason":"BadRequest","code":400}
   
   
   
   [2021-07-30 09:08:06,498] {taskinstance.py:1496} INFO - Marking task as 
FAILED. dag_id=spark_pi_try, task_id=spark_pi_trial, 
execution_date=20210730T090758, start_date=20210730T090806, 
end_date=20210730T090806
   [2021-07-30 09:08:06,530] {local_task_job.py:146} INFO - Task exited with 
return code 1 </details>
   
   
![0-02-03-711ab2c3e352194eec49082e8ddb5914028f4ba88b9de789d4bf156057bba134_1c6da3f411b301](https://user-images.githubusercontent.com/35587732/127818257-aa06c2c6-c216-4f28-a73a-dc5c76d89d36.jpg)
   
   
   **What you expected to happen**:
   
   The YAML/JSON file contents to be loaded rather than the file path.
   
   **Suggestion**:
   This happens because open & read functions are not used to get the file 
contents before loading. Using open & read to get the contents before loading 
works.
   
   **How to reproduce it**:
   1. Install and start minikube
      Prerequisite: Docker 20.10.7
      ```brew install minikube && minikube start```
   2. Install helm
      ```brew install helm```
   3. Install the airflow and the spark operator using the helm charts.
       Navigate to the helm charts root directory, and edit to configure remote 
logging and run the below commands to install.
       ```helm install airflow . -n <namespace>```
       ```helm install spark-operator  . -n <namespace>```
   4. Create Kubernetes Cluster Connection with the name "kubenetes_default", 
and check "In cluster configuration" box.
   5. Create a simple dag and provide the path to the SparkApplication 
specification yaml file.
       Find sample files here: 
https://github.com/apache/airflow/tree/main/airflow/providers/cncf/kubernetes/example_dags
   6. Copy these dags into the dags location i.e., /opt/airflow/dags within the 
scheduler/driver pods.
   4. Once the dag is visible on the Airflow WebUI, trigger the dag and check 
the logs.
   
   How often does this problem occur? 
   Every time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to