This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4f83e831d2 Validate conn_prefix in extra field for Spark JDBC hook
(#32946)
4f83e831d2 is described below
commit 4f83e831d2e6985b6c82b2e0c45673b58ef81074
Author: Pankaj Koti <[email protected]>
AuthorDate: Mon Jul 31 19:00:42 2023 +0530
Validate conn_prefix in extra field for Spark JDBC hook (#32946)
The `conn_prefix` in `extras` should not contain a `?` as that is
usually meant for appending query params to the URL
and `query_params` are not the intended to be included in the
`conn_prefix` section of the `extras`.
---
airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 ++
.../providers/apache/spark/hooks/test_spark_jdbc.py | 20 ++++++++++++++++++++
2 files changed, 22 insertions(+)
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py
b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index f04c476f1a..5d25866eb8 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -181,6 +181,8 @@ class SparkJDBCHook(SparkSubmitHook):
arguments = []
arguments += ["-cmdType", self._cmd_type]
if self._jdbc_connection["url"]:
+ if "?" in jdbc_conn["conn_prefix"]:
+ raise ValueError("The jdbc extra conn_prefix should not
contain a '?'")
arguments += [
"-url",
f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}",
diff --git a/tests/providers/apache/spark/hooks/test_spark_jdbc.py
b/tests/providers/apache/spark/hooks/test_spark_jdbc.py
index 495f7353a0..fbec739c9f 100644
--- a/tests/providers/apache/spark/hooks/test_spark_jdbc.py
+++ b/tests/providers/apache/spark/hooks/test_spark_jdbc.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+from unittest.mock import patch
+
import pytest
from airflow.models import Connection
@@ -108,6 +110,18 @@ class TestSparkJDBCHook:
extra='{"conn_prefix":"jdbc:postgresql://"}',
)
)
+ db.merge_conn(
+ Connection(
+ conn_id="jdbc-invalid-extra-conn-prefix",
+ conn_type="postgres",
+ host="localhost",
+ schema="default",
+ port=5432,
+ login="user",
+ password="supersecret",
+
extra='{"conn_prefix":"jdbc:mysql://some_host:8085/test?some_query_param=true#"}',
+ )
+ )
def test_resolve_jdbc_connection(self):
# Given
@@ -184,3 +198,9 @@ class TestSparkJDBCHook:
def test_invalid_schema(self):
with pytest.raises(ValueError, match="schema should not contain a"):
SparkJDBCHook(jdbc_conn_id="jdbc-invalid-schema", **self._config)
+
+
@patch("airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook.submit")
+ def test_invalid_extra_conn_prefix(self, mock_submit):
+ hook = SparkJDBCHook(jdbc_conn_id="jdbc-invalid-extra-conn-prefix",
**self._config)
+ with pytest.raises(ValueError, match="extra conn_prefix should not
contain a"):
+ hook.submit_jdbc_job()