This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 937be1dd985 fix: resolve connection master URL construction for
various protocols in SparkSubmitHook (#61528)
937be1dd985 is described below
commit 937be1dd985ccf1a8c68dbddba3bd7a8e771dfa9
Author: Hossein Torabi <[email protected]>
AuthorDate: Wed Feb 25 07:56:18 2026 +0100
fix: resolve connection master URL construction for various protocols in
SparkSubmitHook (#61528)
Signed-off-by: Hossein Torabi <[email protected]>
---
.pre-commit-config.yaml | 1 +
.../apache/spark/docs/connections/spark-submit.rst | 18 ++++---
.../providers/apache/spark/hooks/spark_submit.py | 16 +++++--
.../unit/apache/spark/hooks/test_spark_submit.py | 56 ++++++++++++++++++++++
4 files changed, 81 insertions(+), 10 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index ce41f5d634c..1ee9b0d98b3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -627,6 +627,7 @@ repos:
^providers/apache/hive/src/airflow/providers/apache/hive/transfers/vertica_to_hive\.py$|
^providers/apache/kafka/docs/connections/kafka\.rst$|
^providers/apache/spark/docs/decorators/pyspark\.rst$|
+ ^providers/apache/spark/docs/connections/spark-submit.rst$|
^providers/apache/spark/src/airflow/providers/apache/spark/decorators/|
^providers/apache/spark/src/airflow/providers/apache/spark/hooks/|
^providers/apache/spark/src/airflow/providers/apache/spark/operators/|
diff --git a/providers/apache/spark/docs/connections/spark-submit.rst
b/providers/apache/spark/docs/connections/spark-submit.rst
index f38a2908ba4..3b808bfc32b 100644
--- a/providers/apache/spark/docs/connections/spark-submit.rst
+++ b/providers/apache/spark/docs/connections/spark-submit.rst
@@ -49,17 +49,21 @@ Spark binary (optional)
Kubernetes namespace (optional, only applies to spark on kubernetes
applications)
Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster
resources between multiple users (via resource quota).
-When specifying the connection in environment variable you should specify
-it using URI syntax.
+.. note::
-Note that all components of the URI should be URL-encoded. The URI and the
mongo
-connection string are not the same.
+ When specifying the connection in environment variable you should specify
+ it using URI syntax.
+ You can provide a standard Spark master URI directly.
+ The master URL will be parsed correctly without needing repeated prefixes
such as ``spark://spark://...``
+ Ensure all URI components are URL-encoded.
-For example:
+ For example:
+
+ .. code-block:: bash
+
+ export
AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
-.. code-block:: bash
- export
AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
.. warning::
diff --git
a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
index 1fe3173bc41..0cb93afb861 100644
---
a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
+++
b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -265,10 +265,20 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
# Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT
and
# k8s://https://<HOST>:<PORT>
conn = self.get_connection(self._conn_id)
- if conn.port:
- conn_data["master"] = f"{conn.host}:{conn.port}"
- else:
+
+ # When connection is created from URI, the scheme (spark://,
k8s://, etc.)
+ # is stored in conn_type, and conn.host contains only the hostname.
+ # When created from UI, conn_type is typically "spark" and
conn.host
+ # may contain the full master URL (e.g., k8s://https://host).
+ if conn.conn_type == "spark" and conn.host and ("://" in conn.host
or not conn.port):
+ # UI-based spark connection where host contains the full
master URL
conn_data["master"] = conn.host
+ else:
+ # Reconstruct URL with conn_type as protocol
+ conn_data["master"] = f"{conn.conn_type}://{conn.host or ''}"
+
+ # Append port if provided
+ conn_data["master"] = f"{conn_data['master']}:{conn.port}" if
conn.port else conn_data["master"]
# Determine optional yarn queue from the extra field
extra = conn.extra_dejson
diff --git
a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
index 79a7bcb1f65..35eb59041f0 100644
--- a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
+++ b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py
@@ -167,6 +167,37 @@ class TestSparkSubmitHook:
extra='{"keytab": "privileged_user.keytab"}',
)
)
+ create_connection_without_db(
+ Connection(
+ conn_id="spark_uri_with_protocol",
+ uri="spark://spark-master:7077",
+ )
+ )
+ create_connection_without_db(
+ Connection(
+ conn_id="spark_uri_yarn",
+ uri="yarn://yarn-master",
+ )
+ )
+ create_connection_without_db(
+ Connection(
+ conn_id="mesos_uri",
+ uri="mesos://mesos-host:5050",
+ )
+ )
+ create_connection_without_db(
+ Connection(
+ conn_id="k8s_uri",
+ uri="k8s://https://k8s-host:443",
+ )
+ )
+
+ create_connection_without_db(
+ Connection(
+ conn_id="local_uri",
+ uri="spark://local",
+ )
+ )
@pytest.mark.db_test
@patch(
@@ -532,6 +563,31 @@ class TestSparkSubmitHook:
assert connection == expected_spark_connection
assert cmd[0] == "spark3-submit"
+ def test_resolve_connection_spark_uri_with_protocol(self):
+ hook = SparkSubmitHook(conn_id="spark_uri_with_protocol")
+ connection = hook._resolve_connection()
+ assert connection["master"] == "spark://spark-master:7077"
+
+ def test_resolve_connection_spark_uri_yarn(self):
+ hook = SparkSubmitHook(conn_id="spark_uri_yarn")
+ connection = hook._resolve_connection()
+ assert connection["master"] == "yarn://yarn-master"
+
+ def test_resolve_connection_mesos_uri(self):
+ hook = SparkSubmitHook(conn_id="mesos_uri")
+ connection = hook._resolve_connection()
+ assert connection["master"] == "mesos://mesos-host:5050"
+
+ def test_resolve_connection_k8s_uri(self):
+ hook = SparkSubmitHook(conn_id="k8s_uri")
+ connection = hook._resolve_connection()
+ assert connection["master"] == "k8s://https://k8s-host:443"
+
+ def test_resolve_connection_local_uri(self):
+ hook = SparkSubmitHook(conn_id="local_uri")
+ connection = hook._resolve_connection()
+ assert connection["master"] == "local"
+
def test_resolve_connection_custom_spark_binary_allowed_in_hook(self):
SparkSubmitHook(conn_id="spark_binary_set",
spark_binary="another-custom-spark-submit")