Vamsi-klu commented on code in PR #68144:
URL: https://github.com/apache/airflow/pull/68144#discussion_r3369970042
##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
return [hive_bin, *cmd_extra, *hive_params_list]
+ def _get_jdbc_url_parameters(self) -> dict[str, str]:
+ jdbc_params = self._get_connection_jdbc_url_parameters()
+
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+ return jdbc_params
+
+ def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+ extra_dejson = getattr(self.conn, "extra_dejson", {})
+ transport_mode = extra_dejson.get("transport_mode")
+ if not transport_mode:
+ return {}
+ transport_mode = str(transport_mode).lower()
+ if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+ allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+ raise ValueError(f"The transport_mode connection extra should be
one of: {allowed_modes}")
+ return {"transportMode": transport_mode}
+
+ @classmethod
+ def _validate_jdbc_url_parameters(cls, jdbc_params: Mapping[str, Any]) ->
dict[str, str]:
+ return {
+ cls._validate_jdbc_url_parameter_name(name):
cls._validate_jdbc_url_parameter_value(name, value)
+ for name, value in jdbc_params.items()
+ }
+
+ @staticmethod
+ def _validate_jdbc_url_parameter_name(name: str) -> str:
+ if not isinstance(name, str) or not
JDBC_PARAMETER_NAME_PATTERN.fullmatch(name):
+ raise ValueError(
+ "JDBC parameter names must be non-empty strings that start
with a letter and contain "
+ "only letters, digits, dots, underscores, or hyphens"
+ )
+ return name
+
+ @staticmethod
+ def _validate_jdbc_url_parameter_value(name: str, value: Any) -> str:
+ if value is None:
+ raise ValueError(f"JDBC parameter {name!r} value should not be
None")
+ value = str(value)
+ if ";" in value:
+ raise ValueError(f"JDBC parameter {name!r} value should not
contain the ';' character")
+ return value
Review Comment:
For the value the only rule is "no `;`", which I kept as a `";" in
str(value)` membership check inside the same method — I think that reads a bit
more clearly than a regex here, but happy to switch to one if you'd prefer.
##########
providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -212,6 +223,55 @@ def _prepare_cli_cmd(self) -> list[Any]:
return [hive_bin, *cmd_extra, *hive_params_list]
+ def _get_jdbc_url_parameters(self) -> dict[str, str]:
+ jdbc_params = self._get_connection_jdbc_url_parameters()
+
jdbc_params.update(self._validate_jdbc_url_parameters(self.jdbc_params))
+ return jdbc_params
+
+ def _get_connection_jdbc_url_parameters(self) -> dict[str, str]:
+ extra_dejson = getattr(self.conn, "extra_dejson", {})
+ transport_mode = extra_dejson.get("transport_mode")
+ if not transport_mode:
+ return {}
+ transport_mode = str(transport_mode).lower()
+ if transport_mode not in HIVE_CLI_TRANSPORT_MODES:
+ allowed_modes = ", ".join(sorted(HIVE_CLI_TRANSPORT_MODES))
+ raise ValueError(f"The transport_mode connection extra should be
one of: {allowed_modes}")
+ return {"transportMode": transport_mode}
+
+ @classmethod
+ def _validate_jdbc_url_parameters(cls, jdbc_params: Mapping[str, Any]) ->
dict[str, str]:
+ return {
+ cls._validate_jdbc_url_parameter_name(name):
cls._validate_jdbc_url_parameter_value(name, value)
+ for name, value in jdbc_params.items()
+ }
+
+ @staticmethod
+ def _validate_jdbc_url_parameter_name(name: str) -> str:
+ if not isinstance(name, str) or not
JDBC_PARAMETER_NAME_PATTERN.fullmatch(name):
+ raise ValueError(
+ "JDBC parameter names must be non-empty strings that start
with a letter and contain "
+ "only letters, digits, dots, underscores, or hyphens"
+ )
+ return name
+
+ @staticmethod
+ def _validate_jdbc_url_parameter_value(name: str, value: Any) -> str:
+ if value is None:
+ raise ValueError(f"JDBC parameter {name!r} value should not be
None")
+ value = str(value)
+ if ";" in value:
+ raise ValueError(f"JDBC parameter {name!r} value should not
contain the ';' character")
+ return value
+
+ @staticmethod
+ def _append_jdbc_url_parameters(jdbc_url: str, jdbc_params: Mapping[str,
str]) -> str:
+ if not jdbc_params:
+ return jdbc_url
+ jdbc_url_params = ";".join(f"{name}={value}" for name, value in
jdbc_params.items())
+ separator = "" if jdbc_url.endswith(";") else ";"
+ return f"{jdbc_url}{separator}{jdbc_url_params}"
Review Comment:
Good question. A DAG author can't edit the JDBC URL directly —
`_prepare_cli_cmd` assembles it programmatically from the connection's
host/port/schema/auth (plus principal/proxy_user and the HA settings), and
`_validate_beeline_parameters` rejects `;` in the schema, so there's no raw-URL
field to append params to. `jdbc_params` is the sanctioned, validated place for
that.
The use case driving this (issue #45049) is Cloudera CDP: the same base
`hive_cli` connection is used against clusters that need params like
`transportMode=http`, `httpPath`, `ssl`, `sslTrustStore`, `trustStorePassword`,
`AuthMech` — values that vary by target cluster rather than belonging on one
shared connection, so a per-task `jdbc_params` is more practical than a
separate connection per variant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]