fzkhouy commented on issue #27147: URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285531033
> Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at > > https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284 > . I was able to see the yaml content. > > Edit : Please also ensure there is no trailing space in the filename after yaml extension which might also cause this. > > ```python > import datetime > > from airflow.decorators import dag > from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator > > @dag(start_date=datetime.datetime(2021, 1, 1)) > def mydag(): > op = SparkKubernetesOperator( > application_file="config.yaml", > kubernetes_conn_id='kubernetes_with_namespace', > task_id='test_task_id', > ) > > mydag() > ``` > > ``` > airflow dags test mydag > [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags > [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag > /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated > [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False> > [2022-10-20 08:18:58,905] {dag.py:3621} INFO - ***************************************************** > [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars: > AIRFLOW_CTX_DAG_OWNER=airflow > AIRFLOW_CTX_DAG_ID=mydag > AIRFLOW_CTX_TASK_ID=test_task_id > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00 > AIRFLOW_CTX_TRY_NUMBER=1 > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00 > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars: > AIRFLOW_CTX_DAG_OWNER=airflow > AIRFLOW_CTX_DAG_ID=mydag > AIRFLOW_CTX_TASK_ID=test_task_id > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00 > AIRFLOW_CTX_TRY_NUMBER=1 > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00 > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication > apiVersion: "sparkoperator.k8s.io/v1beta2" > kind: SparkApplication > metadata: > name: spark-pi > namespace: default > spec: > type: Scala > mode: cluster > image: "gcr.io/spark-operator/spark:v2.4.5" > imagePullPolicy: Always > mainClass: org.apache.spark.examples.SparkPi > mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar" > sparkVersion: "2.4.5" > restartPolicy: > type: Never > volumes: > - name: "test-volume" > hostPath: > path: "/tmp" > type: Directory > driver: > cores: 1 > coreLimit: "1200m" > memory: "512m" > labels: > version: 2.4.5 > serviceAccount: spark > volumeMounts: > - name: "test-volume" > mountPath: "/tmp" > executor: > cores: 1 > instances: 1 > memory: "512m" > labels: > version: 2.4.5 > volumeMounts: > - name: "test-volume" > mountPath: "/tmp" > ``` **Thnak you for your reply. Well this is the dag file that reproduce the problem:** ``` from __future__ import annotations from datetime import datetime, timedelta from airflow import DAG # Operators; we need this to operate! from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor from airflow.operators.empty import EmptyOperator with DAG( dag_id="spark_pi", default_args={'max_active_runs': 1}, description='submit spark-pi as sparkApplication on kubernetes', schedule=timedelta(days=1), start_date=datetime(2021, 1, 1), catchup=False, ) as dag: t0 = EmptyOperator(task_id='airflow_health_check', retries=1, dag=dag) t1 = SparkKubernetesOperator( task_id='main_app', kubernetes_conn_id='kubernetes_cluster', namespace="spark", application_file="/opt/airflow/kubernetes/templates/airflow-worker-template.yaml", do_xcom_push=True, dag=dag, ) t2 = SparkKubernetesSensor( task_id='spark_pi_monitor', kubernetes_conn_id='kubernetes_cluster', namespace="spark", application_name="{{ task_instance.xcom_pull(task_ids='main_app')['metadata']['name'] }}", dag=dag, ) t0 >> t1 >> t2 ``` **When debugging on my own, i found that the problem is in this function** https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L33-L38 because when debugging it : - _generating the same problem by this code (the actual code in airflow)_ ``` from airflow.utils import yaml body = yaml.safe_load("airflow-worker-template.yaml") ``` The body in this case is the path, when printing the body it gives `"airflow-worker-template.yaml"` So, we get the problem which is `TypeError: string indices must be integers` when we want to access the "metadata"... - _resolving the problem_ With the same code, i just give the yaml object instead of the path to the function. Like this: ``` from airflow.utils import yaml with open("airflow-worker-template.yaml ") as yaml_file: body = yaml.safe_load(yaml_file) ``` in that case the body is giving the yaml content, so we can access the "metadata" Therefore, what i see is the function needs to verify whether the giving value ends with ".yaml".. which means a path for a file then we can open it, otherwise, work with the actual process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
