yehoon17 opened a new issue, #46169:
URL: https://github.com/apache/airflow/issues/46169

   ### Apache Airflow Provider(s)
   
   apache-spark
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-spark==5.0.0
   
   
   ### Apache Airflow version
   
   2.10.4
   
   ### Operating System
   
   Microsoft Windows 11
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker version 27.4.0, build bde2b89
   Docker Compose version v2.31.0-desktop.2
   Docker Desktop 4.37.1 (178610)
   
   ### What happened
   
   When using Docker Compose to configure Apache Airflow with Spark, the 
`SparkSubmitOperator` fails to execute the Spark job due to an incorrect 
`--master` URL format in the `spark-submit` command.
   
   With docker compose
   ```yml
   services:
     airflow-webserver:
       build:
         context: .
         dockerfile: ./airflow/Dockerfile # installed java and 
apache-airflow-providers-apache-spark
       container_name: airflow-webserver
       environment:
         - AIRFLOW_CONN_SPARK_DEFAULT=spark://spark-master:7077
         ...
   
     airflow-scheduler:
       build:
         context: .
         dockerfile: ./airflow/Dockerfile # installed java and 
apache-airflow-providers-apache-spark
       container_name: airflow-scheduler
       environment:
         - AIRFLOW_CONN_SPARK_DEFAULT=spark://spark-master:7077
         ...
   ```
   
   for DAG with 
   ```python
   SparkSubmitOperator(
           ...
           conn_id='spark_default',  
           ...
   )
   ```
   
   failed with log 
   ```log
   INFO - Spark-Submit cmd: spark-submit --master spark-master:7077 --conf ... 
/opt/spark/jobs/test_simple_job.py
   ...
   INFO - : org.apache.spark.SparkException: Could not parse Master URL: 
'spark-master:7077'
   ...
   ERROR - Failed to execute job 2 for task spark_submit_task (Cannot execute: 
spark-submit --master spark-master:7077 --conf ... 
/opt/spark/jobs/test_simple_job.py. Error code is: 1.; 59)
   ```
   
   ### What you think should happen instead
   
   The `spark-submit` command should use the correct `--master` URL format:
   `--master spark://spark-master:7077`
   
   The expected command should look like this:
   ```bash
   spark-submit --master spark://spark-master:7077 
/opt/spark/jobs/my_spark_job.py
   ```
   
   ### How to reproduce
   
   1. Clone the repository and switch to the issue-report branch:
      ```bash
      git clone -b issue-report https://github.com/yehoon17/data-analysis.git
      ```
   2. Start the Docker Compose setup:
      ```bash
      docker compose -f docker-compose.issue.yml up -d
      ```
   3. Trigger the DAG with `dag_id=spark_job_dag`.
   
   The error described above will occur when the DAG attempts to run the Spark 
job.
   
   ### Details
   `docker-compose.issue.yml`:
   ```
   services:
     postgres:
       image: postgres:17.2
       container_name: postgres
       env_file:
         - ./.env
       ports:
         - "5433:5432"
       volumes:
         - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
   
     airflow-webserver:
       build:
         context: .
         dockerfile: ./airflow/Dockerfile
       container_name: airflow-webserver
       environment:
         - 
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN}
         - AIRFLOW_CONN_POSTGRES_DEFAULT=${AIRFLOW_CONN_POSTGRES_DEFAULT}
         - AIRFLOW_CONN_SPARK_DEFAULT=spark://spark-master:7077
       depends_on:
         - postgres
       volumes:
         - ./airflow/dags:/opt/airflow/dags
         - ./airflow/logs:/opt/airflow/logs
         - ./spark/jobs:/opt/spark/jobs
         - ./raw_data:/opt/airflow/raw_data
       ports:
         - "8080:8080"
       command: >
         bash -c "airflow db migrate &&
                   airflow users create --username admin --password admin 
--firstname Admin --lastname User --role Admin --email [email protected] &&
                   airflow webserver"
   
     airflow-scheduler:
       build:
         context: .
         dockerfile: ./airflow/Dockerfile
       container_name: airflow-scheduler
       environment:
         - 
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN}
         - AIRFLOW_CONN_POSTGRES_DEFAULT=${AIRFLOW_CONN_POSTGRES_DEFAULT}
         - AIRFLOW_CONN_SPARK_DEFAULT=spark://spark-master:7077
       depends_on:
         - postgres
       volumes:
         - ./airflow/dags:/opt/airflow/dags
         - ./airflow/logs:/opt/airflow/logs
         - ./spark/jobs:/opt/spark/jobs
         - ./raw_data:/opt/airflow/raw_data
       command: >
         bash -c "airflow scheduler"
       restart: always
   
     spark-master:
       image: bitnami/spark:3.5.4
       container_name: spark-master
       environment:
         - SPARK_MODE=master
         - SPARK_RPC_AUTHENTICATION_ENABLED=no
         - SPARK_RPC_ENCRYPTION_ENABLED=no
         - SPARK_LOCAL_IP=spark-master
       ports:
         - "7077:7077"  # Spark Master Port
         - "18080:8080"  # Spark Web UI
       volumes:
         - ./raw_data:/opt/spark/raw_data
         - ./spark/jobs:/opt/spark/jobs
   
     spark-worker:
       image: bitnami/spark:3.5.4
       container_name: spark-worker
       environment:
         - SPARK_MODE=worker
         - SPARK_MASTER_URL=spark://spark-master:7077
         - SPARK_RPC_AUTHENTICATION_ENABLED=no
         - SPARK_RPC_ENCRYPTION_ENABLED=no
       depends_on:
         - spark-master
       ports:
         - "18081:8081"  # Spark Worker UI
       volumes:
         - ./raw_data:/opt/spark/raw_data
   ```
   
   `airflow/Dockerfile`:
   ```FROM apache/airflow:2.10.4
   
   # Switch to root user to install system-level dependencies
   USER root
   
   # Install required libraries and clean up in a single layer
   RUN apt-get update && apt-get install -y --no-install-recommends \
       gcc \
       # g++ \
       heimdal-dev \
       # libsasl2-dev \
       openjdk-17-jre-headless \
     && apt-get autoremove -yqq --purge \
     && apt-get clean \
     && rm -rf /var/lib/apt/lists/*
   
   # Set JAVA_HOME environment variable for Spark compatibility
   ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
   ENV PATH=$JAVA_HOME/bin:$PATH
   
   # Switch back to airflow user
   USER airflow
   
   # Copy and install Python dependencies
   COPY requirements.txt /requirements.txt
   RUN pip install --no-cache-dir -r /requirements.txt \
     && pip uninstall -y argparse
   ```
   
   `airflow/dags/test_spark_dag.py`:
   ```python
   from airflow import DAG
   from airflow.providers.apache.spark.operators.spark_submit import 
SparkSubmitOperator
   from datetime import datetime, timedelta
   
   # Default arguments for the DAG
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   # Define the DAG
   with DAG(
       dag_id='spark_job_dag',
       default_args=default_args,
       description='Run a Spark job using Airflow',
       schedule_interval=None,  
       start_date=datetime(2023, 1, 1),
       catchup=False,
   ) as dag:
   
       # SparkSubmitOperator to run the Spark job
       run_spark_job = SparkSubmitOperator(
           task_id='spark_submit_task',
           application='/opt/spark/jobs/test_simple_job.py',  
           conn_id='spark_default',  
           name='example_spark_job',
           execution_timeout=timedelta(minutes=10),
           conf={
               'spark.cores.max': '2',
               'spark.executor.memory': '1G',
               'spark.executor.instances': '1',
           },
           executor_cores=1,
           executor_memory='1G',
           driver_memory='512M',
           verbose=True
       )
   
       run_spark_job
   ```
   
   `spark/jobs/test_simple_job.py'`:
   ```python
   from pyspark.sql import SparkSession
   
   def main():
       # Initialize a Spark session
       spark = SparkSession.builder \
           .appName("SimplePySparkJob") \
           .getOrCreate()
   
       # Create a DataFrame
       data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
       df = spark.createDataFrame(data, ["name", "value"])
   
       # Show the DataFrame
       df.show()
   
       # Perform some transformations and actions
       result = df.groupBy("value").count()
       result.show()
   
       # Stop the Spark session
       spark.stop()
   
   if __name__ == "__main__":
       main()
   ```
   
   ### Anything else
   
   #### **Resolution**
   To fix the issue, update the `conn_data["master"]` assignment in the 
`SparkSubmitHook`:
   1. Access the relevant containers:
      ```bash
      docker exec -it airflow-webserver /bin/bash
      docker exec -it airflow-scheduler /bin/bash
      ```
   2. Edit the file:
      for `apache-airflow-providers-apache-spark==5.0.0`:
      ```bash
      sed -i '271s|conn_data\["master"\] = 
f"{conn.host}:{conn.port}"|conn_data["master"] = 
f"{conn.conn_type}://{conn.host}:{conn.port}"|' 
/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py
      ```
      Change this line:
      ```python
      conn_data["master"] = f"{conn.host}:{conn.port}"
      ```
      To this:
      ```python
      conn_data["master"] = f"{conn.conn_type}://{conn.host}:{conn.port}"
      ```
   
   #### **Outcome**
   After making this change, the `SparkSubmitOperator` correctly constructs the 
`--master` argument with the `spark://` prefix, resolving the error 
`org.apache.spark.SparkException: Could not parse Master URL: 
'spark-master:7077'`
   
   Updated Airflow log output:
   ```
   INFO - Spark-Submit cmd: spark-submit --master spark://spark-master:7077 
--name example_spark_job /opt/airflow/spark_jobs/my_spark_job.py
   ...
   INFO StandaloneAppClient$ClientEndpoint: Connecting to master 
spark://spark-master:7077...
   ...
   INFO - 25/01/27 21:04:20 INFO TransportClientFactory: Successfully created 
connection to spark-master/172.18.0.5:7077 after 121 ms (0 ms spent in 
bootstraps)
   ...
   INFO - Marking task as SUCCESS. dag_id=spark_job_dag, 
task_id=spark_submit_task, run_id=manual__2025-01-27T21:03:55.131353+00:00, 
execution_date=20250127T210355, start_date=20250127T210402, 
end_date=20250127T211001
   ...
   ```
   
   
   Full story: https://github.com/yehoon17/data-analysis/issues/5
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to