This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24f7a89d96ce4b51a0a19d1a7e2e0c7f07b11e38
Author: RyuSA <[email protected]>
AuthorDate: Sat Apr 27 14:41:28 2024 +0900

    Make task log messages include run_id (#39280)
    
    * Make task log messages include run_id
    
    * apply format change (pre-commit)
    
    (cherry picked from commit b9773358a7da2eb4dc2eab4dca80a9b21655fcef)
---
 airflow/models/taskinstance.py          | 6 ++++--
 tests/cli/commands/test_task_command.py | 9 ++++++---
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 42824fa586..f154461a77 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1196,8 +1196,9 @@ def _log_state(*, task_instance: TaskInstance | 
TaskInstancePydantic, lead_msg:
         str(task_instance.state).upper(),
         task_instance.dag_id,
         task_instance.task_id,
+        task_instance.run_id,
     ]
-    message = "%sMarking task as %s. dag_id=%s, task_id=%s, "
+    message = "%sMarking task as %s. dag_id=%s, task_id=%s, run_id=%s, "
     if task_instance.map_index >= 0:
         params.append(task_instance.map_index)
         message += "map_index=%d, "
@@ -2486,9 +2487,10 @@ class TaskInstance(Base, LoggingMixin):
                     raise
                 self.defer_task(defer=defer, session=session)
                 self.log.info(
-                    "Pausing task as DEFERRED. dag_id=%s, task_id=%s, 
execution_date=%s, start_date=%s",
+                    "Pausing task as DEFERRED. dag_id=%s, task_id=%s, 
run_id=%s, execution_date=%s, start_date=%s",
                     self.dag_id,
                     self.task_id,
+                    self.run_id,
                     _date_or_empty(task_instance=self, attr="execution_date"),
                     _date_or_empty(task_instance=self, attr="start_date"),
                 )
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index a3cc88f3d3..3854fdfcfb 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -151,7 +151,10 @@ class TestCliTasks:
         args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, 
DEFAULT_DATE.isoformat()])
         with caplog.at_level("INFO", logger="airflow.task"):
             task_command.task_test(args)
-        assert f"Marking task as SUCCESS. dag_id={self.dag_id}, 
task_id={task_id}" in caplog.text
+        assert (
+            f"Marking task as SUCCESS. dag_id={self.dag_id}, 
task_id={task_id}, run_id={self.run_id}, "
+            in caplog.text
+        )
 
     
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     def test_test_filters_secrets(self, capsys):
@@ -829,7 +832,7 @@ class TestLogsfromTaskRunCommand:
 
         assert (
             f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
-            f"task_id={self.task_id}, execution_date=20170101T000000" in logs
+            f"task_id={self.task_id}, run_id={self.run_id}, 
execution_date=20170101T000000" in logs
         )
 
     @unittest.skipIf(not hasattr(os, "fork"), "Forking not available")
@@ -869,7 +872,7 @@ class TestLogsfromTaskRunCommand:
         assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', 
'{self.task_id}'," in logs
         assert (
             f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
-            f"task_id={self.task_id}, execution_date=20170101T000000" in logs
+            f"task_id={self.task_id}, run_id={self.run_id}, 
execution_date=20170101T000000" in logs
         )
 
     def test_log_file_template_with_run_task(self):

Reply via email to