GitHub user MeltonSmith added a comment to the discussion: Spark-submit task 
fails on deploy mode cluster

I've faced the same issue recently
Airflow 2.10.4
Spark 3.5.4

I still don't get why 6066 is supposed to help :)

As the author says, the problem is in _resolve_should_track_driver_status 
method, which looks like this:
```python
def _resolve_should_track_driver_status(self):
        """
        Determines whether or not this hook should poll the spark driver status 
through
        subsequent spark-submit status requests after the initial spark-submit 
request
        :return: if the driver status should be tracked
        """
        return ('spark://' in self._connection['master'] and
                self._connection['deploy_mode'] == 'cluster')
```
So I changed the type of spark connection in "connection" section of airflow 
from "Spark" to "Spark Connect"
![image](https://github.com/user-attachments/assets/b244c8e0-0592-4935-a913-c0a30546cbd8)


Dag brings nothing extraordinary:
```python
from airflow.providers.apache.spark.operators.spark_submit import 
SparkSubmitOperator

from airflow.operators.bash import BashOperator

from airflow.operators.python_operator import PythonOperator

from airflow.hooks.S3_hook import S3Hook

from datetime import datetime, timedelta

from textwrap import dedent

from airflow import DAG


s3_log_path = "s3a://spark-history-server/history"

spark_config = {

    "spark.sql.shuffle.partitions": 8,

    "spark.executor.memory":"4G",

    "spark.driver.memory":"4G",

    "spark.submit.deployMode": "cluster",

    "spark.standalone.submit.waitAppCompletion": "true",

    "spark.hadoop.fs.s3a.endpoint": "",

    "spark.hadoop.fs.s3a.access.key":"",

    "spark.hadoop.fs.s3a.secret.key":"",
}


with DAG(

    'foo',

    default_args={

        'depends_on_past': False,

        'retries': 1,

        'retry_delay': timedelta(minutes=5),

    },

    description='Some desc',

    schedule_interval=timedelta(days=1),

    start_date=datetime(2021, 1, 1),

    catchup=False,

    tags=['example'],

) as dag:

    t1 = SparkSubmitOperator(

        application="s3a://bucket/artifacts/driver/2.1.0/driver-2.1.0.jar",

        java_class="melt.smith.engine.driver.DriverApp",

        conf = spark_config,

        conn_id = "spark_default",

        task_id="some task name",

    )
```






GitHub link: 
https://github.com/apache/airflow/discussions/21799#discussioncomment-11912410

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to