goingforstudying-ctrl commented on code in PR #68246:
URL: https://github.com/apache/airflow/pull/68246#discussion_r3438759790


##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -182,7 +183,10 @@ def proc(
                     labels[LABEL_TRY_NUMBER] = str(try_number)
                 if map_index := event.get("map_index"):
                     labels["map_index"] = str(map_index)
-
+                # In AF3 supervisor context record.task_instance is not set.
+                # Parse labels from the structured log path as additional 
fallback.
+                path_labels = _labels_from_path(str(relative))
+                labels.update(path_labels)

Review Comment:
   Fixed — reworded the comment to avoid the 'fallback' framing. The path 
labels are now described as filling in missing keys rather than overriding 
event-derived ones.



##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str, 
page_token: str | None = None)
         return "\n".join(messages), page.next_page_token
 
 
+def _labels_from_path(relative_path: str) -> dict[str, str]:

Review Comment:
   Fixed — renamed the function to `_extract_labels_from_path` as requested.



##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str, 
page_token: str | None = None)
         return "\n".join(messages), page.next_page_token
 
 
+def _labels_from_path(relative_path: str) -> dict[str, str]:
+    """Parse AF3 log path into Stackdriver labels.
+
+    AF3's log path template is::
+
+        dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
+
+    All four label fields are extracted with zero DB access.  When the path
+    does not match the expected format the function returns an empty dict
+    so callers can fall back to other label sources.
+    """
+    # Strip the trailing file extension (.log) and split into segments
+    stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else 
relative_path
+    segments = stem.split("/")
+    labels: dict[str, str] = {}
+    for segment in segments:
+        if "=" not in segment:
+            continue
+        key, _, value = segment.partition("=")
+        if key == "dag_id":
+            labels[LABEL_DAG_ID] = value
+        elif key == "task_id":
+            labels[LABEL_TASK_ID] = value
+        elif key == "attempt":
+            labels[LABEL_TRY_NUMBER] = value
+        elif key == "run_id":
+            # run_id is NOT a standard Stackdriver label yet, but it is used
+            # on the write side via the log path template.  Store it so the
+            # read path can filter on it (Bug 2 will wire this up).
+            pass

Review Comment:
   Fixed — removed the no-op branch as requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to