This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 937be1dd985 fix: resolve connection master URL construction for 
various protocols in SparkSubmitHook (#61528)
937be1dd985 is described below

commit 937be1dd985ccf1a8c68dbddba3bd7a8e771dfa9
Author: Hossein Torabi <[email protected]>
AuthorDate: Wed Feb 25 07:56:18 2026 +0100

    fix: resolve connection master URL construction for various protocols in 
SparkSubmitHook (#61528)
    
    Signed-off-by: Hossein Torabi <[email protected]>
---
 .pre-commit-config.yaml                            |  1 +
 .../apache/spark/docs/connections/spark-submit.rst | 18 ++++---
 .../providers/apache/spark/hooks/spark_submit.py   | 16 +++++--
 .../unit/apache/spark/hooks/test_spark_submit.py   | 56 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 10 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index ce41f5d634c..1ee9b0d98b3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -627,6 +627,7 @@ repos:
           
^providers/apache/hive/src/airflow/providers/apache/hive/transfers/vertica_to_hive\.py$|
           ^providers/apache/kafka/docs/connections/kafka\.rst$|
           ^providers/apache/spark/docs/decorators/pyspark\.rst$|
+          ^providers/apache/spark/docs/connections/spark-submit.rst$|
           
^providers/apache/spark/src/airflow/providers/apache/spark/decorators/|
           ^providers/apache/spark/src/airflow/providers/apache/spark/hooks/|
           
^providers/apache/spark/src/airflow/providers/apache/spark/operators/|
diff --git a/providers/apache/spark/docs/connections/spark-submit.rst 
b/providers/apache/spark/docs/connections/spark-submit.rst
index f38a2908ba4..3b808bfc32b 100644
--- a/providers/apache/spark/docs/connections/spark-submit.rst
+++ b/providers/apache/spark/docs/connections/spark-submit.rst
@@ -49,17 +49,21 @@ Spark binary (optional)
 Kubernetes namespace (optional, only applies to spark on kubernetes 
applications)
     Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster 
resources between multiple users (via resource quota).
 
-When specifying the connection in environment variable you should specify
-it using URI syntax.
+.. note::
 
-Note that all components of the URI should be URL-encoded. The URI and the 
mongo
-connection string are not the same.
+  When specifying the connection in environment variable you should specify
+  it using URI syntax.
+  You can provide a standard Spark master URI directly.
+  The master URL will be parsed correctly without needing repeated prefixes 
such as ``spark://spark://...``
+  Ensure all URI components are URL-encoded.
 
-For example:
+  For example:
+
+  .. code-block:: bash
+
+     export 
AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
 
-.. code-block:: bash
 
-   export 
AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
 
 .. warning::
 
diff --git 
a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
 
b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
index 1fe3173bc41..0cb93afb861 100644
--- 
a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
+++ 
b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -265,10 +265,20 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             # Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT 
and
             # k8s://https://<HOST>:<PORT>
             conn = self.get_connection(self._conn_id)
-            if conn.port:
-                conn_data["master"] = f"{conn.host}:{conn.port}"
-            else:
+
+            # When connection is created from URI, the scheme (spark://, 
k8s://, etc.)
+            # is stored in conn_type, and conn.host contains only the hostname.
+            # When created from UI, conn_type is typically "spark" and 
conn.host
+            # may contain the full master URL (e.g., k8s://https://host).
+            if conn.conn_type == "spark" and conn.host and ("://" in conn.host 
or not conn.port):
+                # UI-based spark connection where host contains the full 
master URL
                 conn_data["master"] = conn.host
+            else:
+                # Reconstruct URL with conn_type as protocol
+                conn_data["master"] = f"{conn.conn_type}://{conn.host or ''}"
+
+            # Append port if provided
+            conn_data["master"] = f"{conn_data['master']}:{conn.port}" if 
conn.port else conn_data["master"]
 
             # Determine optional yarn queue from the extra field
             extra = conn.extra_dejson
diff --git 
a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py 
b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
index 79a7bcb1f65..35eb59041f0 100644
--- a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
+++ b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
@@ -167,6 +167,37 @@ class TestSparkSubmitHook:
                 extra='{"keytab": "privileged_user.keytab"}',
             )
         )
+        create_connection_without_db(
+            Connection(
+                conn_id="spark_uri_with_protocol",
+                uri="spark://spark-master:7077",
+            )
+        )
+        create_connection_without_db(
+            Connection(
+                conn_id="spark_uri_yarn",
+                uri="yarn://yarn-master",
+            )
+        )
+        create_connection_without_db(
+            Connection(
+                conn_id="mesos_uri",
+                uri="mesos://mesos-host:5050",
+            )
+        )
+        create_connection_without_db(
+            Connection(
+                conn_id="k8s_uri",
+                uri="k8s://https://k8s-host:443";,
+            )
+        )
+
+        create_connection_without_db(
+            Connection(
+                conn_id="local_uri",
+                uri="spark://local",
+            )
+        )
 
     @pytest.mark.db_test
     @patch(
@@ -532,6 +563,31 @@ class TestSparkSubmitHook:
         assert connection == expected_spark_connection
         assert cmd[0] == "spark3-submit"
 
+    def test_resolve_connection_spark_uri_with_protocol(self):
+        hook = SparkSubmitHook(conn_id="spark_uri_with_protocol")
+        connection = hook._resolve_connection()
+        assert connection["master"] == "spark://spark-master:7077"
+
+    def test_resolve_connection_spark_uri_yarn(self):
+        hook = SparkSubmitHook(conn_id="spark_uri_yarn")
+        connection = hook._resolve_connection()
+        assert connection["master"] == "yarn://yarn-master"
+
+    def test_resolve_connection_mesos_uri(self):
+        hook = SparkSubmitHook(conn_id="mesos_uri")
+        connection = hook._resolve_connection()
+        assert connection["master"] == "mesos://mesos-host:5050"
+
+    def test_resolve_connection_k8s_uri(self):
+        hook = SparkSubmitHook(conn_id="k8s_uri")
+        connection = hook._resolve_connection()
+        assert connection["master"] == "k8s://https://k8s-host:443";
+
+    def test_resolve_connection_local_uri(self):
+        hook = SparkSubmitHook(conn_id="local_uri")
+        connection = hook._resolve_connection()
+        assert connection["master"] == "local"
+
     def test_resolve_connection_custom_spark_binary_allowed_in_hook(self):
         SparkSubmitHook(conn_id="spark_binary_set", 
spark_binary="another-custom-spark-submit")
 

Reply via email to