This is an automated email from the ASF dual-hosted git repository.

ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a7775a7974 Fix empty labels in Stackdriver log IO for Airflow 3 
Supervisor (#68292)
4a7775a7974 is described below

commit 4a7775a797472f521c4cc08b14cc2ffe7d7fdb52
Author: Taehoon Kim <[email protected]>
AuthorDate: Fri Jun 12 15:19:48 2026 +0900

    Fix empty labels in Stackdriver log IO for Airflow 3 Supervisor (#68292)
    
    This PR adds fallback logic to StackdriverRemoteLogIO to extract task 
labels directly from the structlog context dictionary when record.task_instance 
is missing, addressing the empty labels issue in the Airflow 3 Supervisor.
---
 .../google/cloud/log/stackdriver_task_handler.py   | 12 ++++++
 .../cloud/log/test_stackdriver_task_handler.py     | 45 ++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
 
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 41992d98ff2..dd184c230eb 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -171,6 +171,18 @@ class StackdriverRemoteLogIO(LoggingMixin):
                 labels.update(self.labels)
             if ti:
                 labels.update(_task_instance_to_labels(ti))
+            else:
+                if dag_id := event.get("dag_id"):
+                    labels[LABEL_DAG_ID] = str(dag_id)
+                if task_id := event.get("task_id"):
+                    labels[LABEL_TASK_ID] = str(task_id)
+                if run_id := event.get("run_id"):
+                    labels["run_id"] = str(run_id)
+                if try_number := event.get("try_number"):
+                    labels[LABEL_TRY_NUMBER] = str(try_number)
+                if map_index := event.get("map_index"):
+                    labels["map_index"] = str(map_index)
+
             _transport.send(record, str(msg.get("event", "")), 
resource=self.resource, labels=labels)
             return event
 
diff --git 
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py 
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
index 0eb6c209f8f..da0068ae67b 100644
--- 
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
+++ 
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
@@ -189,6 +189,51 @@ class TestStackdriverRemoteLogIO:
 
         mock_transport.flush.assert_called_once()
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only 
exists in Airflow 3+")
+    @mock.patch(
+        
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverRemoteLogIO.transport",
+        new_callable=PropertyMock,
+    )
+    def test_processors_fallback_to_event_labels(self, mock_transport_prop):
+        mock_transport = mock.MagicMock()
+        mock_transport_prop.return_value = mock_transport
+
+        io = StackdriverRemoteLogIO(
+            base_log_folder=self.local_log_location,
+            gcp_log_name="airflow",
+        )
+        logger = mock.MagicMock()
+        # Mock relative_path_from_logger to return something truthy
+        with mock.patch(
+            "airflow.sdk.log.relative_path_from_logger",
+            return_value="some/path.py",
+        ):
+            proc = io.processors[0]
+            event = {
+                "event": "Test message",
+                "dag_id": "test_dag_id",
+                "task_id": "test_task_id",
+                "run_id": "test_run_id",
+                "try_number": 2,
+                "map_index": -1,
+            }
+
+            result = proc(logger, "info", event)
+
+            assert result == event
+
+            mock_transport.send.assert_called_once()
+            _, kwargs = mock_transport.send.call_args
+
+            labels = kwargs.get("labels", {})
+            assert labels == {
+                "dag_id": "test_dag_id",
+                "task_id": "test_task_id",
+                "run_id": "test_run_id",
+                "try_number": "2",
+                "map_index": "-1",
+            }
+
     
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
     def test_prepare_log_filter(self, mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ("creds", "project_id")

Reply via email to