Nataneljpwd commented on code in PR #68144:
URL: https://github.com/apache/airflow/pull/68144#discussion_r3369875261


##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -50,6 +50,8 @@
 
 
 HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"]
+HIVE_CLI_TRANSPORT_MODES = {"binary", "http"}

Review Comment:
   Why is this a special case? Why specifically this config? Why not treat it 
as any other one?



##########
providers/apache/hive/docs/connections/hive_cli.rst:
##########
@@ -80,9 +80,36 @@ High Availability (optional)
 Ssl (optional)
     Specify as ``True`` to enable SSL for your high availability connection.
 
+Transport Mode (optional)
+    Specify the JDBC ``transportMode`` parameter. Supported values are 
``binary`` and ``http``.
+
 Zoo Keeper Namespace (optional)
     Zoo keeper namespace for high availability.
 
+Additional JDBC Parameters
+--------------------------
+
+Dag authors can pass additional Beeline JDBC URL parameters to ``HiveCliHook`` 
or ``HiveOperator``
+with the ``jdbc_params`` argument:
+
+.. code-block:: python
+
+   HiveCliHook(
+       jdbc_params={
+           "transportMode": "http",
+           "sslTrustStore": "/opt/hive/truststore.jks",
+           "trustStorePassword": "secret",
+       }
+   )
+
+These parameters are appended to the Beeline JDBC URL. Parameter names must 
start with a letter
+and contain only letters, digits, dots, underscores, or hyphens. Values must 
not contain semicolons.

Review Comment:
   Isn't it simpler to give a regex for it? Maybe not instead but additionally 
to the verbose explanation 



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -50,6 +50,8 @@
 
 
 HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"]
+HIVE_CLI_TRANSPORT_MODES = {"binary", "http"}
+JDBC_PARAMETER_NAME_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9._-]*$")

Review Comment:
   Can it end with a dash? Or a period? (Which I think might need to be 
escaped) Or an underscore?



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")
+        return {"transportMode": transport_mode}
+
+    @classmethod
+    def _validate_jdbc_url_parameters(cls, jdbc_params: Mapping[str, Any]) -> 
dict[str, str]:
+        return {
+            cls._validate_jdbc_url_parameter_name(name): 
cls._validate_jdbc_url_parameter_value(name, value)
+            for name, value in jdbc_params.items()
+        }
+
+    @staticmethod
+    def _validate_jdbc_url_parameter_name(name: str) -> str:
+        if not isinstance(name, str) or not 
JDBC_PARAMETER_NAME_PATTERN.fullmatch(name):
+            raise ValueError(
+                "JDBC parameter names must be non-empty strings that start 
with a letter and contain "
+                "only letters, digits, dots, underscores, or hyphens"
+            )
+        return name
+
+    @staticmethod
+    def _validate_jdbc_url_parameter_value(name: str, value: Any) -> str:
+        if value is None:
+            raise ValueError(f"JDBC parameter {name!r} value should not be 
None")
+        value = str(value)
+        if ";" in value:
+            raise ValueError(f"JDBC parameter {name!r} value should not 
contain the ';' character")
+        return value

Review Comment:
   Why not just create a regex for this?



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")
+        return {"transportMode": transport_mode}
+
+    @classmethod
+    def _validate_jdbc_url_parameters(cls, jdbc_params: Mapping[str, Any]) -> 
dict[str, str]:
+        return {
+            cls._validate_jdbc_url_parameter_name(name): 
cls._validate_jdbc_url_parameter_value(name, value)
+            for name, value in jdbc_params.items()
+        }
+
+    @staticmethod
+    def _validate_jdbc_url_parameter_name(name: str) -> str:
+        if not isinstance(name, str) or not 
JDBC_PARAMETER_NAME_PATTERN.fullmatch(name):
+            raise ValueError(
+                "JDBC parameter names must be non-empty strings that start 
with a letter and contain "
+                "only letters, digits, dots, underscores, or hyphens"
+            )
+        return name
+
+    @staticmethod
+    def _validate_jdbc_url_parameter_value(name: str, value: Any) -> str:
+        if value is None:
+            raise ValueError(f"JDBC parameter {name!r} value should not be 
None")
+        value = str(value)
+        if ";" in value:
+            raise ValueError(f"JDBC parameter {name!r} value should not 
contain the ';' character")
+        return value
+
+    @staticmethod
+    def _append_jdbc_url_parameters(jdbc_url: str, jdbc_params: Mapping[str, 
str]) -> str:
+        if not jdbc_params:
+            return jdbc_url
+        jdbc_url_params = ";".join(f"{name}={value}" for name, value in 
jdbc_params.items())
+        separator = "" if jdbc_url.endswith(";") else ";"
+        return f"{jdbc_url}{separator}{jdbc_url_params}"

Review Comment:
   Can't I just, as of now, just put the params in the url directly? I 
understand it is not as flexible, but when will I need different jdbc params 
for the same connection?
   
   Not saying this is not needed, rather asking when is it used, as I have not 
encountered such use case, as we have a pretty strict whitelist for per session 
configuration changes, though I know there are other varying clusters, so I am 
just curious 



##########
providers/apache/hive/docs/connections/hive_cli.rst:
##########
@@ -80,9 +80,36 @@ High Availability (optional)
 Ssl (optional)
     Specify as ``True`` to enable SSL for your high availability connection.
 
+Transport Mode (optional)
+    Specify the JDBC ``transportMode`` parameter. Supported values are 
``binary`` and ``http``.
+
 Zoo Keeper Namespace (optional)
     Zoo keeper namespace for high availability.
 
+Additional JDBC Parameters
+--------------------------
+
+Dag authors can pass additional Beeline JDBC URL parameters to ``HiveCliHook`` 
or ``HiveOperator``
+with the ``jdbc_params`` argument:
+
+.. code-block:: python
+
+   HiveCliHook(
+       jdbc_params={
+           "transportMode": "http",
+           "sslTrustStore": "/opt/hive/truststore.jks",
+           "trustStorePassword": "secret",
+       }
+   )
+
+These parameters are appended to the Beeline JDBC URL. Parameter names must 
start with a letter
+and contain only letters, digits, dots, underscores, or hyphens. Values must 
not contain semicolons.
+
+Arbitrary JDBC parameters are not read from connection extras. Only fixed 
connection extras with

Review Comment:
   What are fixed connection extras with bounded values? Extras whose value is 
part of an enum?



##########
providers/apache/hive/docs/connections/hive_cli.rst:
##########
@@ -80,9 +80,36 @@ High Availability (optional)
 Ssl (optional)
     Specify as ``True`` to enable SSL for your high availability connection.
 
+Transport Mode (optional)
+    Specify the JDBC ``transportMode`` parameter. Supported values are 
``binary`` and ``http``.

Review Comment:
   Why was this specifically added? You didn't add it for all other params 
(which is fine) but why did you add this one?



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")
+        return {"transportMode": transport_mode}

Review Comment:
   From this it looks like the transport mode is the only jdbc param available



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -141,6 +148,9 @@ def get_connection_form_widgets(cls) -> dict[str, Any]:
             ),
             "high_availability": BooleanField(lazy_gettext("High Availability 
mode"), default=False),
             "ssl": BooleanField(lazy_gettext("Ssl"), default=True),
+            "transport_mode": StringField(
+                lazy_gettext("Transport Mode"), widget=BS3TextFieldWidget(), 
default=""

Review Comment:
   Why was this introduced?



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")
+        return {"transportMode": transport_mode}
+
+    @classmethod
+    def _validate_jdbc_url_parameters(cls, jdbc_params: Mapping[str, Any]) -> 
dict[str, str]:
+        return {
+            cls._validate_jdbc_url_parameter_name(name): 
cls._validate_jdbc_url_parameter_value(name, value)
+            for name, value in jdbc_params.items()
+        }
+
+    @staticmethod
+    def _validate_jdbc_url_parameter_name(name: str) -> str:
+        if not isinstance(name, str) or not 
JDBC_PARAMETER_NAME_PATTERN.fullmatch(name):
+            raise ValueError(
+                "JDBC parameter names must be non-empty strings that start 
with a letter and contain "
+                "only letters, digits, dots, underscores, or hyphens"
+            )
+        return name

Review Comment:
   Do we really need a method for validating a specific regex? Either make it 1 
method for both name and value (passing the regex and error message) or inline 
the checks



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params

Review Comment:
   IMO it would be slightly easier to follow if it was inlined



##########
providers/apache/hive/docs/connections/hive_cli.rst:
##########
@@ -80,9 +80,36 @@ High Availability (optional)
 Ssl (optional)
     Specify as ``True`` to enable SSL for your high availability connection.
 
+Transport Mode (optional)
+    Specify the JDBC ``transportMode`` parameter. Supported values are 
``binary`` and ``http``.
+
 Zoo Keeper Namespace (optional)
     Zoo keeper namespace for high availability.
 
+Additional JDBC Parameters
+--------------------------
+
+Dag authors can pass additional Beeline JDBC URL parameters to ``HiveCliHook`` 
or ``HiveOperator``
+with the ``jdbc_params`` argument:
+
+.. code-block:: python
+
+   HiveCliHook(
+       jdbc_params={
+           "transportMode": "http",
+           "sslTrustStore": "/opt/hive/truststore.jks",
+           "trustStorePassword": "secret",
+       }
+   )
+
+These parameters are appended to the Beeline JDBC URL. Parameter names must 
start with a letter
+and contain only letters, digits, dots, underscores, or hyphens. Values must 
not contain semicolons.
+
+Arbitrary JDBC parameters are not read from connection extras. Only fixed 
connection extras with

Review Comment:
   Are they not read from connection extras at all?



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")

Review Comment:
   Why do we treat this as a special case? Why can't it be like other params?



##########
providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py:
##########
@@ -619,6 +630,89 @@ def test_drop_partition(self, get_metastore_client_mock, 
table_exist_mock):
         assert metastore_mock.drop_partition(self.table, db=self.database, 
part_vals=[DEFAULT_DATE_DS]), ret
 
 
+class TestHiveCliHookJdbcParams:
+    @mock.patch.object(HiveCliHook, "get_connection")
+    def test_jdbc_params_append_to_beeline_url(self, mock_get_connection):
+        mock_get_connection.return_value = get_hive_cli_connection()
+
+        hook = HiveCliHook(
+            jdbc_params={
+                "transportMode": "http",
+                "sslTrustStore": "/opt/hive/truststore.jks",
+                "trustStorePassword": "secret=ok",
+            }
+        )
+
+        assert hook._prepare_cli_cmd() == [
+            "beeline",
+            "-u",
+            '"jdbc:hive2://localhost:10000/default;'
+            
'transportMode=http;sslTrustStore=/opt/hive/truststore.jks;trustStorePassword=secret=ok"',
+        ]
+
+    @mock.patch.object(HiveCliHook, "get_connection")
+    def test_jdbc_params_compose_with_auth_login_and_password(self, 
mock_get_connection):
+        mock_get_connection.return_value = 
get_hive_cli_connection(login="user", password="password")
+
+        hook = HiveCliHook(auth="LDAP", jdbc_params={"transportMode": "http"})
+
+        assert hook._prepare_cli_cmd() == [
+            "beeline",
+            "-u",
+            
'"jdbc:hive2://localhost:10000/default;auth=LDAP;transportMode=http"',
+            "-n",
+            "user",
+            "-p",
+            "password",
+        ]
+
+    @mock.patch.object(HiveCliHook, "get_connection")
+    def test_connection_extra_and_hook_jdbc_params_compose_predictably(self, 
mock_get_connection):
+        mock_get_connection.return_value = 
get_hive_cli_connection(extra_dejson={"transport_mode": "binary"})
+
+        hook = HiveCliHook(
+            jdbc_params={
+                "transportMode": "http",
+                "sslTrustStore": "/opt/hive/truststore.jks",
+            }
+        )
+
+        assert hook._prepare_cli_cmd() == [
+            "beeline",
+            "-u",
+            
'"jdbc:hive2://localhost:10000/default;transportMode=http;sslTrustStore=/opt/hive/truststore.jks"',
+        ]
+
+    @pytest.mark.parametrize(
+        ("jdbc_params", "message"),
+        [
+            ({"transportMode;ssl": "http"}, "JDBC parameter names"),
+            ({"transportMode=ssl": "http"}, "JDBC parameter names"),
+            ({"transport Mode": "http"}, "JDBC parameter names"),
+            ({"transportMode": "http;ssl=true"}, "JDBC parameter 
'transportMode' value"),
+            ({"transportMode": None}, "JDBC parameter 'transportMode' value"),
+        ],
+    )
+    @mock.patch.object(HiveCliHook, "get_connection")
+    def test_invalid_jdbc_params_are_rejected(self, mock_get_connection, 
jdbc_params, message):
+        mock_get_connection.return_value = get_hive_cli_connection()
+        hook = HiveCliHook(jdbc_params=jdbc_params)
+
+        with pytest.raises(ValueError, match=message):
+            hook._prepare_cli_cmd()
+
+    @pytest.mark.parametrize("transport_mode", ["invalid", "http;ssl=true"])
+    @mock.patch.object(HiveCliHook, "get_connection")
+    def test_invalid_transport_mode_connection_extra_is_rejected(self, 
mock_get_connection, transport_mode):
+        mock_get_connection.return_value = get_hive_cli_connection(
+            extra_dejson={"transport_mode": transport_mode}
+        )
+        hook = HiveCliHook()
+
+        with pytest.raises(ValueError, match="transport_mode connection 
extra"):
+            hook._prepare_cli_cmd()

Review Comment:
   Maybe I missed it but I saw no test for checking it when it is valid 



##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
 
         return [hive_bin, *cmd_extra, *hive_params_list]
 
+    def _get_jdbc_url_parameters(self) -> dict[str, str]:
+        jdbc_params = self._get_connection_jdbc_url_parameters()
+        
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+        return jdbc_params
+
+    def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+        extra_dejson = getattr(self.conn, "extra_dejson", {})
+        transport_mode = extra_dejson.get("transport_mode")
+        if not transport_mode:
+            return {}
+        transport_mode = str(transport_mode).lower()
+        if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+            allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+            raise ValueError(f"The transport_mode connection extra should be 
one of: {allowed_modes}")
+        return {"transportMode": transport_mode}

Review Comment:
   At least for the connection 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to