moomindani commented on code in PR #66895:
URL: https://github.com/apache/airflow/pull/66895#discussion_r3263752278
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -172,17 +202,27 @@ def get_conn(self) -> AirflowConnection:
if not self._sql_conn or prev_token != new_token:
if self._sql_conn: # close already existing connection
self._sql_conn.close()
+ session_config: dict[str, str] = dict(self.session_config) if
self.session_config else {}
+ if self.query_tags:
+ tags_str = _format_query_tags(self.query_tags)
+ existing = session_config.get("QUERY_TAGS", "")
+ session_config["QUERY_TAGS"] = f"{existing},{tags_str}" if
existing else tags_str
+
+ connect_kwargs = {
Review Comment:
Minor fragility: `get_conn()` only rebuilds `_sql_conn` when `not
self._sql_conn or prev_token != new_token`. If anything triggers `get_conn()`
on this hook *before* `execute()` sets `hook.query_tags`, the cached connection
(without the new `QUERY_TAGS` session config) is returned and the new tags
never take effect.
For the current operator flow (cached_property `_hook` per operator
instance, single `execute()` per task) this is safe. But it's quietly
load-bearing on "nothing else calls `get_conn()` first" — a one-line comment
near the `prev_token != new_token` check noting that session-config changes
require a reconnect would protect against a future refactor regressing it.
Alternatively, take `query_tags` at hook construction time so it can never
change after `_sql_conn` is created.
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py:
##########
@@ -46,6 +46,22 @@
_DISALLOWED_SQL_TOKENS = (";", "--", "/*", "*/")
+def _get_airflow_query_tags(context: Context) -> dict[str, str | None]:
+ """Return Airflow context metadata as a query-tags dict."""
+ task_instance = context["ti"]
Review Comment:
`context["ti"]` will `KeyError` if a caller passes a context dict that
doesn't contain `"ti"`. In normal Airflow runtime that's always there, but the
operator-side `_get_query_tags` already takes care to guard `context is not
None`, and the test suite passes a `mock_context = {"ti": object()}` to
exercise this path — it'd be consistent to also handle the no-`ti` case
gracefully (`context.get("ti")` and bail out if `None`). Then this helper
becomes safe to call from anywhere.
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -71,6 +71,30 @@ def _cancel():
return timer, timeout_event
+def _format_query_tag_value(value: str) -> str:
+ """
+ Escape special characters and truncate a single query tag value.
+
+ Databricks ``QUERY_TAGS`` uses ``key:value`` pairs delimited by commas, so
+ backslash, comma and colon inside *values* must be escaped. Values are
also
+ capped at 128 characters before escaping to keep the overall tag string
+ within reasonable bounds.
+ """
+ value = str(value)[:128]
Review Comment:
Two small issues with the truncation here:
1. **Silent.** Long Airflow values (some `run_id` formats are ~60 chars;
custom run_ids / user-supplied tags can be longer) get cut off without any log
message, which is hard to debug from the analytics side. A
`self.log.warning(...)` on the call site, or a `log.warning(...)` from a module
logger here when truncation actually fires, would be very cheap to add.
2. **Order vs. final byte length.** Truncation runs *before* escaping, so
the escaped value can exceed 128 chars. If the goal is "cap the source string
at 128 chars" the current order is fine; if the goal is "keep the final escaped
value ≤ 128 chars on the wire" the order should flip. Worth a brief comment
stating which contract this function is keeping.
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -103,6 +131,7 @@ def __init__(
http_headers: list[tuple[str, str]] | None = None,
catalog: str | None = None,
schema: str | None = None,
+ query_tags: dict[str, str | None] | None = None,
Review Comment:
Inserting `query_tags` *before* `caller` is a backwards-incompatible change
to the public hook constructor signature — anything passing `caller`
positionally now silently reinterprets that string as `query_tags`. The PR
already had to fix this for the in-repo sensors (the `caller=self.caller`
keyword edits below), but third-party code calling `DatabricksSqlHook(...,
"MyHook")` positionally will break with a confusing type/behaviour error rather
than a clean failure.
Moving `query_tags` to *after* `caller` (or making it kwarg-only) removes
the hazard entirely, and the sensor `caller=self.caller` change becomes
optional rather than required. Worth doing either way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]