SameerMesiah97 commented on code in PR #66886:
URL: https://github.com/apache/airflow/pull/66886#discussion_r3238298731
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py:
##########
@@ -46,6 +46,46 @@
_DISALLOWED_SQL_TOKENS = (";", "--", "/*", "*/")
+def _escape_query_tag_value(value: str) -> str:
+ """Escape Databricks query-tag separators in a tag value."""
+ return str(value).replace("\\", "\\\\").replace(",", "\\,").replace(":",
"\\:")
+
+
+def _format_query_tags(context: Context) -> str:
+ """Format Airflow context metadata into databricks-sql-connector query
tags."""
+ tags = []
+ if "dag" in context and getattr(context["dag"], "dag_id", None):
+
tags.append(f"airflow_dag_id:{_escape_query_tag_value(context['dag'].dag_id)}")
+ if "task" in context and getattr(context["task"], "task_id", None):
+
tags.append(f"airflow_task_id:{_escape_query_tag_value(context['task'].task_id)}")
+ if "run_id" in context and context["run_id"]:
+
tags.append(f"airflow_run_id:{_escape_query_tag_value(context['run_id'])}")
+
+ return ",".join(tags)
Review Comment:
I think this could be made more clear and explicit via a mapping driven
approach. Please see the below for guidance:
```
_QUERY_TAG_FIELDS = {
"airflow_dag_id": ("dag", "dag_id"),
"airflow_task_id": ("task", "task_id"),
"airflow_run_id": ("run_id", None),
}
def _format_query_tags(context: Context) -> str:
tags = []
for tag_name, (context_key, attr) in _QUERY_TAG_FIELDS.items():
value = context.get(context_key)
if attr:
value = getattr(value, attr, None)
if value:
tags.append(f"{tag_name}:{_escape_query_tag_value(value)}")
return ",".join(tags)
```
Also, you could do the same for `_escape_query_tag_value`:
```
_QUERY_TAG_ESCAPE_SEQUENCES = {
"\\": "\\\\",
",": "\\,",
":": "\\:",
}
def _escape_query_tag_value(value: str) -> str:
"""Escape Databricks query-tag separator characters in a tag value."""
escaped = str(value)
for char, replacement in _QUERY_TAG_ESCAPE_SEQUENCES.items():
escaped = escaped.replace(char, replacement)
return escaped
```
##########
providers/databricks/tests/unit/databricks/operators/test_databricks_copy.py:
##########
@@ -522,3 +522,34 @@ def test_get_openlineage_facets():
"externalQuery": ExternalQueryRunFacet(externalQueryId="query_id",
source="scheme://host")
}
assert result.job_facets == {"sql": SQLJobFacet(query=op._sql)}
+
+
Review Comment:
I think this test should be moved above the openlineage tests as it is
closer to core execution logic. Also, they only cover the happy-path. Could we
add coverage for empty/partial context, empty existing query_tags, and values
containing commas/colons/backslashes?
The escaping path is probably the most important one here since
`_escape_query_tag_value() `is not really exercised end-to-end right now. It
would also be good to verify that unrelated `session_configuration` values are
preserved after the merge.
##########
providers/databricks/tests/unit/databricks/operators/test_databricks_sql.py:
##########
@@ -453,3 +453,32 @@ def test_parse_gcs_path():
bucket, object_name =
op._parse_gcs_path("gs://my-bucket/path/to/file.parquet")
assert bucket == "my-bucket"
assert object_name == "path/to/file.parquet"
+
+
Review Comment:
I would move this test after `test_exec_write_gcs_parquet_output`. Also, the
above comment regarding incomplete test coverage applies here too.
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py:
##########
@@ -153,6 +193,10 @@ def _hook(self) -> DatabricksSqlHook:
def get_db_hook(self) -> DatabricksSqlHook:
return self._hook
+ def execute(self, context: Context) -> Any:
+ _inject_query_tags(self.get_db_hook(), context)
+ return super().execute(context)
Review Comment:
Do we want this behavior configurable (operator/provider-level opt-out)?
Since this mutates `session_configuration` automatically, some users may prefer
explicit control over injected warehouse metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]