shahar1 commented on code in PR #68246:
URL: https://github.com/apache/airflow/pull/68246#discussion_r3426264770
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str,
page_token: str | None = None)
return "\n".join(messages), page.next_page_token
+def _labels_from_path(relative_path: str) -> dict[str, str]:
Review Comment:
should be renamed to `_extract_labels_from_path`
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -182,7 +183,10 @@ def proc(
labels[LABEL_TRY_NUMBER] = str(try_number)
if map_index := event.get("map_index"):
labels["map_index"] = str(map_index)
-
+ # In AF3 supervisor context record.task_instance is not set.
+ # Parse labels from the structured log path as additional
fallback.
+ path_labels = _labels_from_path(str(relative))
+ labels.update(path_labels)
Review Comment:
It overrides event-derived labels rather than acting as a "fallback"
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str,
page_token: str | None = None)
return "\n".join(messages), page.next_page_token
+def _labels_from_path(relative_path: str) -> dict[str, str]:
+ """Parse AF3 log path into Stackdriver labels.
+
+ AF3's log path template is::
+
+ dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
+
+ All four label fields are extracted with zero DB access. When the path
+ does not match the expected format the function returns an empty dict
+ so callers can fall back to other label sources.
+ """
+ # Strip the trailing file extension (.log) and split into segments
+ stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else
relative_path
+ segments = stem.split("/")
+ labels: dict[str, str] = {}
+ for segment in segments:
+ if "=" not in segment:
+ continue
+ key, _, value = segment.partition("=")
+ if key == "dag_id":
+ labels[LABEL_DAG_ID] = value
+ elif key == "task_id":
+ labels[LABEL_TASK_ID] = value
+ elif key == "attempt":
+ labels[LABEL_TRY_NUMBER] = value
+ elif key == "run_id":
+ # run_id is NOT a standard Stackdriver label yet, but it is used
+ # on the write side via the log path template. Store it so the
+ # read path can filter on it (Bug 2 will wire this up).
+ pass
Review Comment:
Currently it seems like a no-op, so I'd rather drop this branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]