This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 28f56a007e106d1373c6c231a67b87c45b63697c
Author: Steve Zhang <[email protected]>
AuthorDate: Fri Oct 22 08:35:26 2021 -0700

    add detailed information to logging when a dag or a task finishes. (#19097)
    
    * add detailed information to logging when a dag or a task finishes.
    
    * make logging of start_date/end_date ISO format to be consist
    
    * fix pre-commit
    
    * use only %s in new logging statements to gracefully handle when certain 
variables are None
    
    * fix precommit
    
    * use self._state instead of get_state().  add computation for dag run 
duration based on start_date and end_date
    
    * make linter happy with format
    
    * fix typo
    
    * put back missing reference to _state
    
    Co-authored-by: Daniel Imberman <[email protected]>
    (cherry picked from commit 324c31c2d7ad756ce3814f74f0b6654d02f19426)
---
 airflow/jobs/scheduler_job.py | 25 +++++++++++++++++++++++++
 airflow/models/dagrun.py      | 25 +++++++++++++++++++++++++
 2 files changed, 50 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index bf2410a..87dc158 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -533,6 +533,31 @@ class SchedulerJob(BaseJob):
                 self.log.info("Setting external_id for %s to %s", ti, info)
                 continue
 
+            msg = (
+                "TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, "
+                "run_start_date=%s, run_end_date=%s, "
+                "run_duration=%s, state=%s, executor_state=%s, try_number=%s, 
max_tries=%s, job_id=%s, "
+                "pool=%s, queue=%s, priority_weight=%d, operator=%s"
+            )
+            self.log.info(
+                msg,
+                ti.dag_id,
+                ti.task_id,
+                ti.run_id,
+                ti.start_date,
+                ti.end_date,
+                ti.duration,
+                ti.state,
+                state,
+                try_number,
+                ti.max_tries,
+                ti.job_id,
+                ti.pool,
+                ti.queue,
+                ti.priority_weight,
+                ti.operator,
+            )
+
             if ti.try_number == buffer_key.try_number and ti.state == 
State.QUEUED:
                 Stats.incr('scheduler.tasks.killed_externally')
                 msg = (
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 1d53265..e8eb685 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -542,6 +542,31 @@ class DagRun(Base, LoggingMixin):
         else:
             self.set_state(State.RUNNING)
 
+        if self._state == State.FAILED or self._state == State.SUCCESS:
+            msg = (
+                "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
+                "run_start_date=%s, run_end_date=%s, run_duration=%s, "
+                "state=%s, external_trigger=%s, run_type=%s, "
+                "data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
+            )
+            self.log.info(
+                msg,
+                self.dag_id,
+                self.execution_date,
+                self.run_id,
+                self.start_date,
+                self.end_date,
+                (self.end_date - self.start_date).total_seconds()
+                if self.start_date and self.end_date
+                else None,
+                self._state,
+                self.external_trigger,
+                self.run_type,
+                self.data_interval_start,
+                self.data_interval_end,
+                self.dag_hash,
+            )
+
         
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
         self._emit_duration_stats_for_finished_state()
 

Reply via email to