MaheshSankaran opened a new issue, #46370:
URL: https://github.com/apache/airflow/issues/46370

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==10.0.1
   
   ### Apache Airflow version
   
   2.10.4
   
   ### Operating System
   
   os from the docker image 2.10.4-python3.9
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   airflow chart version is airflow-8.9.0
   
   ### What happened
   
   Hi Team,
   I am trying to run SparkKubernetesOperator based airflow dag.
   This is my spark deployment file which is running fine on spark kubernetes 
operator.
   ```
   apiVersion: sparkoperator.k8s.io/v1beta2
   kind: SparkApplication
   metadata:
     name: spark-pi
     namespace: data-platform
   spec:
     type: Scala
     mode: cluster
     image: spark:3.5.2
     imagePullPolicy: IfNotPresent
     mainClass: "org.apache.spark.examples.SparkPi"
     mainApplicationFile: 
"local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar"
     arguments:
       - "5000"
     sparkVersion: 3.5.2
     driver:
       labels:
         version: 3.5.2
       cores: 1
       memory: 512m
       serviceAccount: spark-operator-spark
     executor:
       labels:
         version: 3.5.2
       instances: 1
       cores: 1
       memory: 512m
   ```
   **airflow dag file,**
   ```
   from datetime import timedelta
   import os
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.utils.dates import days_ago
   from airflow.models import Variable
   
   
   default_args = {
       'owner': 'Mahesh',
       'retries': 3,
       'retry_delay': timedelta(minutes=5)
   }
   
   
   dag = DAG(
       'spark_on_k8s_airflow',
       default_args=default_args,  # Add default_args to the DAG
       start_date=days_ago(1),
       catchup=False,
       schedule_interval=timedelta(days=1)
   )
   
   spark_k8s_task = SparkKubernetesOperator(
       task_id='spark-on-k8s-airflow',
       trigger_rule="all_success",
       depends_on_past=False,
       retries=0,
       application_file='spark_jobs/pi.yaml',
       namespace="data-platform",
       kubernetes_conn_id="dev-dp",
       dag=dag
   )
   
   spark_k8s_task
   ```
   But it fails with below yaml scanner error. But there is no syntax issue 
with my yaml file.
   ```
   [2025-02-03, 06:21:10 UTC] {taskinstance.py:3311} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 733, in _execute_callable
       return ExecutionCallableRunner(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py",
 line 422, in wrapper
       return func(self, *args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
 line 298, in execute
       kube_client=self.client,
     File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
       val = self.func(instance)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
 line 285, in client
       return self.hook.core_v1_client
     File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
       val = self.func(instance)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 302, in core_v1_client
       return client.CoreV1Api(api_client=self.api_client)
     File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
       val = self.func(instance)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 298, in api_client
       return self.get_conn()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 259, in get_conn
       config.load_kube_config(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
 line 819, in load_kube_config
       loader = _get_kube_config_loader(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
 line 771, in _get_kube_config_loader
       kcfg = KubeConfigMerger(filename)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
 line 686, in __init__
       self._load_config_from_file_path(paths)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
 line 711, in _load_config_from_file_path
       self.load_config(path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
 line 716, in load_config
       config = yaml.safe_load(f)
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/__init__.py", 
line 125, in safe_load
       return load(stream, SafeLoader)
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/__init__.py", 
line 81, in load
       return loader.get_single_data()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/yaml/constructor.py", line 
49, in get_single_data
       node = self.get_single_node()
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py", 
line 36, in get_single_node
       document = self.compose_document()
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py", 
line 55, in compose_document
       node = self.compose_node(None, None)
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py", 
line 84, in compose_node
       node = self.compose_mapping_node(anchor)
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py", 
line 127, in compose_mapping_node
       while not self.check_event(MappingEndEvent):
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/parser.py", 
line 98, in check_event
       self.current_event = self.state()
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/parser.py", 
line 428, in parse_block_mapping_key
       if self.check_token(KeyToken):
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py", 
line 116, in check_token
       self.fetch_more_tokens()
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py", 
line 223, in fetch_more_tokens
       return self.fetch_value()
     File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py", 
line 577, in fetch_value
       raise ScannerError(None, None,
   yaml.scanner.ScannerError: mapping values are not allowed here
     in "/tmp/tmpepozi80i", line 1, column 24
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   This is my spark deployment file which is running fine on spark kubernetes 
operator.
   ```
   apiVersion: sparkoperator.k8s.io/v1beta2
   kind: SparkApplication
   metadata:
     name: spark-pi
     namespace: data-platform
   spec:
     type: Scala
     mode: cluster
     image: spark:3.5.2
     imagePullPolicy: IfNotPresent
     mainClass: "org.apache.spark.examples.SparkPi"
     mainApplicationFile: 
"local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar"
     arguments:
       - "5000"
     sparkVersion: 3.5.2
     driver:
       labels:
         version: 3.5.2
       cores: 1
       memory: 512m
       serviceAccount: spark-operator-spark
     executor:
       labels:
         version: 3.5.2
       instances: 1
       cores: 1
       memory: 512m
   ```
   **airflow dag file,**
   ```
   from datetime import timedelta
   import os
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.utils.dates import days_ago
   from airflow.models import Variable
   
   
   default_args = {
       'owner': 'Mahesh',
       'retries': 3,
       'retry_delay': timedelta(minutes=5)
   }
   
   
   dag = DAG(
       'spark_on_k8s_airflow',
       default_args=default_args,  # Add default_args to the DAG
       start_date=days_ago(1),
       catchup=False,
       schedule_interval=timedelta(days=1)
   )
   
   spark_k8s_task = SparkKubernetesOperator(
       task_id='spark-on-k8s-airflow',
       trigger_rule="all_success",
       depends_on_past=False,
       retries=0,
       application_file='spark_jobs/pi.yaml',
       namespace="data-platform",
       kubernetes_conn_id="dev-dp",
       dag=dag
   )
   
   spark_k8s_task
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to