This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4a7775a7974 Fix empty labels in Stackdriver log IO for Airflow 3
Supervisor (#68292)
4a7775a7974 is described below
commit 4a7775a797472f521c4cc08b14cc2ffe7d7fdb52
Author: Taehoon Kim <[email protected]>
AuthorDate: Fri Jun 12 15:19:48 2026 +0900
Fix empty labels in Stackdriver log IO for Airflow 3 Supervisor (#68292)
This PR adds fallback logic to StackdriverRemoteLogIO to extract task
labels directly from the structlog context dictionary when record.task_instance
is missing, addressing the empty labels issue in the Airflow 3 Supervisor.
---
.../google/cloud/log/stackdriver_task_handler.py | 12 ++++++
.../cloud/log/test_stackdriver_task_handler.py | 45 ++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 41992d98ff2..dd184c230eb 100644
---
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -171,6 +171,18 @@ class StackdriverRemoteLogIO(LoggingMixin):
labels.update(self.labels)
if ti:
labels.update(_task_instance_to_labels(ti))
+ else:
+ if dag_id := event.get("dag_id"):
+ labels[LABEL_DAG_ID] = str(dag_id)
+ if task_id := event.get("task_id"):
+ labels[LABEL_TASK_ID] = str(task_id)
+ if run_id := event.get("run_id"):
+ labels["run_id"] = str(run_id)
+ if try_number := event.get("try_number"):
+ labels[LABEL_TRY_NUMBER] = str(try_number)
+ if map_index := event.get("map_index"):
+ labels["map_index"] = str(map_index)
+
_transport.send(record, str(msg.get("event", "")),
resource=self.resource, labels=labels)
return event
diff --git
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
index 0eb6c209f8f..da0068ae67b 100644
---
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
+++
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
@@ -189,6 +189,51 @@ class TestStackdriverRemoteLogIO:
mock_transport.flush.assert_called_once()
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only
exists in Airflow 3+")
+ @mock.patch(
+
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverRemoteLogIO.transport",
+ new_callable=PropertyMock,
+ )
+ def test_processors_fallback_to_event_labels(self, mock_transport_prop):
+ mock_transport = mock.MagicMock()
+ mock_transport_prop.return_value = mock_transport
+
+ io = StackdriverRemoteLogIO(
+ base_log_folder=self.local_log_location,
+ gcp_log_name="airflow",
+ )
+ logger = mock.MagicMock()
+ # Mock relative_path_from_logger to return something truthy
+ with mock.patch(
+ "airflow.sdk.log.relative_path_from_logger",
+ return_value="some/path.py",
+ ):
+ proc = io.processors[0]
+ event = {
+ "event": "Test message",
+ "dag_id": "test_dag_id",
+ "task_id": "test_task_id",
+ "run_id": "test_run_id",
+ "try_number": 2,
+ "map_index": -1,
+ }
+
+ result = proc(logger, "info", event)
+
+ assert result == event
+
+ mock_transport.send.assert_called_once()
+ _, kwargs = mock_transport.send.call_args
+
+ labels = kwargs.get("labels", {})
+ assert labels == {
+ "dag_id": "test_dag_id",
+ "task_id": "test_task_id",
+ "run_id": "test_run_id",
+ "try_number": "2",
+ "map_index": "-1",
+ }
+
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
def test_prepare_log_filter(self, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ("creds", "project_id")