SkastVnT opened a new pull request, #66895:
URL: https://github.com/apache/airflow/pull/66895
## Summary
Closes #66839.
When Databricks SQL operators execute queries, those queries are invisible
in `system.query.history` because no session-level tags are attached. This PR
injects Airflow context metadata into the Databricks **`QUERY_TAGS`** session
parameter so every query can be traced back to the DAG/task that triggered it.
## Approach
Tags are serialised into the `key:value,key:value` format required by
Databricks and injected via `session_configuration={"QUERY_TAGS": "..."}`
inside `DatabricksSqlHook.get_conn()`. This is the correct mechanism — the
connector-level `query_tags=` parameter does **not** propagate to
`system.query.history`.
### Key design decisions
| Decision | Rationale |
|---|---|
| Dict API at the operator level | Callers pass `query_tags={"key":
"value"}` — readable, type-safe, mergeable |
| String serialisation at the hook level | A single place
(`_format_query_tags`) owns the escaping/truncation rules |
| Value escaping | `\`, `,`, `:` are escaped; values truncated at 128 chars |
| Merge with existing `QUERY_TAGS` | Preserves any tags already set in
`session_configuration` |
| `include_airflow_query_tags=False` | Opt-out for operators that don't want
automatic Airflow context tags |
## Changes
###
`providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py`
* Added `_format_query_tag_value(value)` — escape special chars, truncate at
128 chars.
* Added `_format_query_tags(tags)` — convert `dict[str, str | None]` →
`"key:value,..."`.
* `get_conn()` now builds a `session_config` dict, merges any incoming
`QUERY_TAGS` string with the formatted operator tags, and passes it as
`session_configuration` to `sql.connect()`.
###
`providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py`
* Added module-level `_get_airflow_query_tags(context)` that returns a dict
of Airflow context keys (`airflow_dag_id`, `airflow_task_id`, `airflow_run_id`,
`airflow_try_number`, `airflow_map_index`).
* Both `DatabricksSqlOperator` and `DatabricksCopyIntoOperator` accept
`query_tags: dict[str, str | None]` and `include_airflow_query_tags: bool =
True`.
* On `execute()`, each operator merges its own `query_tags` with Airflow
context tags (if enabled) and assigns the result to `hook.query_tags`.
### Tests
* `test_databricks_sql.py` (hooks): replaced old `query_tags=`
connector-kwarg tests with new `session_configuration["QUERY_TAGS"]`
assertions; added `TestFormatQueryTags` class covering value escaping,
truncation, None-omission, and round-trip correctness.
* `test_databricks_sql.py` (operators): existing tests unchanged — they
already test the dict-level API.
## Verification
Tested against a real Databricks workspace: all 4 tasks in the test DAG
completed successfully, and `QUERY_TAGS` appeared in `system.query.history` as
a `MAP<STRING,STRING>` with the expected Airflow keys.
## Checklist
- [x] My PR title and commit messages follow the [commit message
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines).
- [x] My changes are covered by unit tests.
- [x] I've run `ruff format` and `ruff check --fix` — all checks pass.
- [ ] I've updated the provider changelog / newsfragment (will add if
requested).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]