xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883851900


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1751,6 +1946,19 @@ def _schedule_dag_run(
                     "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
                 )
                 return callback
+
+            if (
+                dag_run.scheduled_by_job_id is not None
+                and dag_run.set_scheduled_by_job_id != self.job.id
+                and self.active_dagrun_spans.get(dag_run.run_id) is None
+            ):
+                # If the dag_run has been previously scheduled by another job 
and there is no active span,
+                # then check if the job is still healthy.
+                # If it's not healthy, then recreate the spans.
+                self._recreate_unhealthy_scheduler_spans_if_needed(dag_run, 
session)
+
+            dag_run.set_scheduled_by_job_id(job_id=self.job.id, 
session=session, with_commit=False)

Review Comment:
   > Why are we resetting the scheduled_by here?
   
   The purpose of `scheduled_by_job_id` is to keep track of the id of the last 
scheduler that processed the dag_run so that we can check if the job_id belongs 
to a healthy scheduler when a new scheduler picks up the dag_run.
   
   We are updating it to point to the current job_id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to