This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f83e831d2 Validate conn_prefix in extra field for Spark JDBC hook 
(#32946)
4f83e831d2 is described below

commit 4f83e831d2e6985b6c82b2e0c45673b58ef81074
Author: Pankaj Koti <[email protected]>
AuthorDate: Mon Jul 31 19:00:42 2023 +0530

    Validate conn_prefix in extra field for Spark JDBC hook (#32946)
    
    The `conn_prefix` in `extras` should not contain a `?` as that is
    usually meant for appending query params to the URL
    and `query_params` are not the intended to be included in the
    `conn_prefix` section of the `extras`.
---
 airflow/providers/apache/spark/hooks/spark_jdbc.py   |  2 ++
 .../providers/apache/spark/hooks/test_spark_jdbc.py  | 20 ++++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py 
b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index f04c476f1a..5d25866eb8 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -181,6 +181,8 @@ class SparkJDBCHook(SparkSubmitHook):
         arguments = []
         arguments += ["-cmdType", self._cmd_type]
         if self._jdbc_connection["url"]:
+            if "?" in jdbc_conn["conn_prefix"]:
+                raise ValueError("The jdbc extra conn_prefix should not 
contain a '?'")
             arguments += [
                 "-url",
                 
f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}",
diff --git a/tests/providers/apache/spark/hooks/test_spark_jdbc.py 
b/tests/providers/apache/spark/hooks/test_spark_jdbc.py
index 495f7353a0..fbec739c9f 100644
--- a/tests/providers/apache/spark/hooks/test_spark_jdbc.py
+++ b/tests/providers/apache/spark/hooks/test_spark_jdbc.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+from unittest.mock import patch
+
 import pytest
 
 from airflow.models import Connection
@@ -108,6 +110,18 @@ class TestSparkJDBCHook:
                 extra='{"conn_prefix":"jdbc:postgresql://"}',
             )
         )
+        db.merge_conn(
+            Connection(
+                conn_id="jdbc-invalid-extra-conn-prefix",
+                conn_type="postgres",
+                host="localhost",
+                schema="default",
+                port=5432,
+                login="user",
+                password="supersecret",
+                
extra='{"conn_prefix":"jdbc:mysql://some_host:8085/test?some_query_param=true#"}',
+            )
+        )
 
     def test_resolve_jdbc_connection(self):
         # Given
@@ -184,3 +198,9 @@ class TestSparkJDBCHook:
     def test_invalid_schema(self):
         with pytest.raises(ValueError, match="schema should not contain a"):
             SparkJDBCHook(jdbc_conn_id="jdbc-invalid-schema", **self._config)
+
+    
@patch("airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook.submit")
+    def test_invalid_extra_conn_prefix(self, mock_submit):
+        hook = SparkJDBCHook(jdbc_conn_id="jdbc-invalid-extra-conn-prefix", 
**self._config)
+        with pytest.raises(ValueError, match="extra conn_prefix should not 
contain a"):
+            hook.submit_jdbc_job()

Reply via email to