MaheshSankaran opened a new issue, #46370:
URL: https://github.com/apache/airflow/issues/46370
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==10.0.1
### Apache Airflow version
2.10.4
### Operating System
os from the docker image 2.10.4-python3.9
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
airflow chart version is airflow-8.9.0
### What happened
Hi Team,
I am trying to run SparkKubernetesOperator based airflow dag.
This is my spark deployment file which is running fine on spark kubernetes
operator.
```
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: data-platform
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: "org.apache.spark.examples.SparkPi"
mainApplicationFile:
"local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar"
arguments:
- "5000"
sparkVersion: 3.5.2
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
```
**airflow dag file,**
```
from datetime import timedelta
import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
default_args = {
'owner': 'Mahesh',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'spark_on_k8s_airflow',
default_args=default_args, # Add default_args to the DAG
start_date=days_ago(1),
catchup=False,
schedule_interval=timedelta(days=1)
)
spark_k8s_task = SparkKubernetesOperator(
task_id='spark-on-k8s-airflow',
trigger_rule="all_success",
depends_on_past=False,
retries=0,
application_file='spark_jobs/pi.yaml',
namespace="data-platform",
kubernetes_conn_id="dev-dp",
dag=dag
)
spark_k8s_task
```
But it fails with below yaml scanner error. But there is no syntax issue
with my yaml file.
```
[2025-02-03, 06:21:10 UTC] {taskinstance.py:3311} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 733, in _execute_callable
return ExecutionCallableRunner(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/operator_helpers.py",
line 252, in run
return self.func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py",
line 422, in wrapper
return func(self, *args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
line 298, in execute
kube_client=self.client,
File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
val = self.func(instance)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
line 285, in client
return self.hook.core_v1_client
File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
val = self.func(instance)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 302, in core_v1_client
return client.CoreV1Api(api_client=self.api_client)
File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
val = self.func(instance)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 298, in api_client
return self.get_conn()
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 259, in get_conn
config.load_kube_config(
File
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
line 819, in load_kube_config
loader = _get_kube_config_loader(
File
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
line 771, in _get_kube_config_loader
kcfg = KubeConfigMerger(filename)
File
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
line 686, in __init__
self._load_config_from_file_path(paths)
File
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
line 711, in _load_config_from_file_path
self.load_config(path)
File
"/home/airflow/.local/lib/python3.9/site-packages/kubernetes/config/kube_config.py",
line 716, in load_config
config = yaml.safe_load(f)
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/__init__.py",
line 125, in safe_load
return load(stream, SafeLoader)
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/__init__.py",
line 81, in load
return loader.get_single_data()
File
"/home/airflow/.local/lib/python3.9/site-packages/yaml/constructor.py", line
49, in get_single_data
node = self.get_single_node()
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py",
line 36, in get_single_node
document = self.compose_document()
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py",
line 55, in compose_document
node = self.compose_node(None, None)
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py",
line 84, in compose_node
node = self.compose_mapping_node(anchor)
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/composer.py",
line 127, in compose_mapping_node
while not self.check_event(MappingEndEvent):
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/parser.py",
line 98, in check_event
self.current_event = self.state()
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/parser.py",
line 428, in parse_block_mapping_key
if self.check_token(KeyToken):
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py",
line 116, in check_token
self.fetch_more_tokens()
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py",
line 223, in fetch_more_tokens
return self.fetch_value()
File "/home/airflow/.local/lib/python3.9/site-packages/yaml/scanner.py",
line 577, in fetch_value
raise ScannerError(None, None,
yaml.scanner.ScannerError: mapping values are not allowed here
in "/tmp/tmpepozi80i", line 1, column 24
```
### What you think should happen instead
_No response_
### How to reproduce
This is my spark deployment file which is running fine on spark kubernetes
operator.
```
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: data-platform
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: "org.apache.spark.examples.SparkPi"
mainApplicationFile:
"local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar"
arguments:
- "5000"
sparkVersion: 3.5.2
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
```
**airflow dag file,**
```
from datetime import timedelta
import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
default_args = {
'owner': 'Mahesh',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'spark_on_k8s_airflow',
default_args=default_args, # Add default_args to the DAG
start_date=days_ago(1),
catchup=False,
schedule_interval=timedelta(days=1)
)
spark_k8s_task = SparkKubernetesOperator(
task_id='spark-on-k8s-airflow',
trigger_rule="all_success",
depends_on_past=False,
retries=0,
application_file='spark_jobs/pi.yaml',
namespace="data-platform",
kubernetes_conn_id="dev-dp",
dag=dag
)
spark_k8s_task
```
### Anything else
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]