fzkhouy commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285531033

   > Can you please add a sample dag file to reproduce this? I tried below code 
with "config.yaml" relative to the dag file and with a print statement in 
airflow code before the yaml parsing at
   > 
   > 
https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284
   > . I was able to see the yaml content.
   > 
   > Edit : Please also ensure there is no trailing space in the filename after 
yaml extension which might also cause this.
   > 
   > ```python
   > import datetime
   > 
   > from airflow.decorators import dag
   > from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   > 
   > @dag(start_date=datetime.datetime(2021, 1, 1))
   > def mydag():
   >     op = SparkKubernetesOperator(
   >             application_file="config.yaml",
   >             kubernetes_conn_id='kubernetes_with_namespace',
   >             task_id='test_task_id',
   >         )
   > 
   > mydag()
   > ```
   > 
   > ```
   > airflow dags test mydag
   > [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag 
from /files/dags
   > [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
   > /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling 
`DAG.create_dagrun()` without an explicit data interval is deprecated
   > [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun 
mydag @ 2022-10-20T08:18:58.772022+00:00: 
manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. 
externally triggered: False>
   > [2022-10-20 08:18:58,905] {dag.py:3621} INFO - 
*****************************************************
   > [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the 
following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the 
following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating 
sparkApplication
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating 
sparkApplication
   > apiVersion: "sparkoperator.k8s.io/v1beta2"
   > kind: SparkApplication
   > metadata:
   >   name: spark-pi
   >   namespace: default
   > spec:
   >   type: Scala
   >   mode: cluster
   >   image: "gcr.io/spark-operator/spark:v2.4.5"
   >   imagePullPolicy: Always
   >   mainClass: org.apache.spark.examples.SparkPi
   >   mainApplicationFile: 
"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
   >   sparkVersion: "2.4.5"
   >   restartPolicy:
   >     type: Never
   >   volumes:
   >     - name: "test-volume"
   >       hostPath:
   >         path: "/tmp"
   >         type: Directory
   >   driver:
   >     cores: 1
   >     coreLimit: "1200m"
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     serviceAccount: spark
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   >   executor:
   >     cores: 1
   >     instances: 1
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   > ```
   
   **Thnak you for your reply. Well this is the dag file that reproduce the 
problem:**
   ```
   from __future__ import annotations
   from datetime import datetime, timedelta
   from airflow import DAG
   # Operators; we need this to operate!
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import 
SparkKubernetesSensor
   from airflow.operators.empty import EmptyOperator
   
   
   with DAG(
       dag_id="spark_pi",
       default_args={'max_active_runs': 1},
       description='submit spark-pi as sparkApplication on kubernetes',
       schedule=timedelta(days=1),
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ) as dag:
       t0 = EmptyOperator(task_id='airflow_health_check', retries=1, dag=dag)
       t1 = SparkKubernetesOperator(
           task_id='main_app',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           
application_file="/opt/airflow/kubernetes/templates/airflow-worker-template.yaml",
           do_xcom_push=True,
           dag=dag,
   
       )
       t2 = SparkKubernetesSensor(
           task_id='spark_pi_monitor',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           application_name="{{ 
task_instance.xcom_pull(task_ids='main_app')['metadata']['name'] }}",
           dag=dag,
       )
       t0 >> t1 >> t2
   ```
    **When debugging on my own, i found that the problem is in this function**
   
https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L33-L38
   
   because when debugging it :
   
   - _generating the same problem by this code (the actual code in airflow)_
   ```
   from airflow.utils import yaml
   body = yaml.safe_load("airflow-worker-template.yaml")
   ```
   The body in this case is the path,  when printing the body it gives 
`"airflow-worker-template.yaml"`
   So, we get the problem which is `TypeError: string indices must be integers` 
when we want to access the "metadata"...
   - _resolving the problem_
   With the same code, i just give the yaml object instead of the path to the 
function. Like this:
   ```
   from airflow.utils import yaml
   with open("airflow-worker-template.yaml ") as yaml_file:
       body = yaml.safe_load(yaml_file)
   ```
   in that case the body is giving the yaml content, so we can access the 
"metadata"
   Therefore, what i see is the function needs to verify whether  the giving 
value ends with ".yaml".. which means a path for a file then we can open it, 
otherwise,  work with the actual process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to