howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518475273


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
         :param dag_run: The DagRun to schedule
         :return: Callback that needs to be executed
         """
-        callback: DagCallbackRequest | None = None
+        trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+        span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)
+        links = [{"trace_id": trace_id, "span_id": span_id}]
+
+        with Trace.start_span(
+            span_name="_schedule_dag_run", component="SchedulerJobRunner", 
links=links
+        ) as s:
+            s.set_attribute("dag_id", dag_run.dag_id)
+            s.set_attribute("run_id", dag_run.run_id)
+            s.set_attribute("run_type", dag_run.run_type)
+
+            callback: DagCallbackRequest | None = None
+
+            dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+            dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+            if not dag or not dag_model:
+                self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
+                return callback
+
+            if (
+                dag_run.start_date
+                and dag.dagrun_timeout
+                and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+            ):
+                dag_run.set_state(DagRunState.FAILED)
+                unfinished_task_instances = session.scalars(
+                    select(TI)
+                    .where(TI.dag_id == dag_run.dag_id)
+                    .where(TI.run_id == dag_run.run_id)
+                    .where(TI.state.in_(State.unfinished))
+                )
+                for task_instance in unfinished_task_instances:
+                    task_instance.state = TaskInstanceState.SKIPPED
+                    session.merge(task_instance)
+                session.flush()
+                self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+
+                if self._should_update_dag_next_dagruns(
+                    dag, dag_model, last_dag_run=dag_run, session=session
+                ):
+                    dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+
+                callback_to_execute = DagCallbackRequest(
+                    full_filepath=dag.fileloc,
+                    dag_id=dag.dag_id,
+                    run_id=dag_run.run_id,
+                    is_failure_callback=True,
+                    processor_subdir=dag_model.processor_subdir,
+                    msg="timed_out",
+                )
 
-        dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-        dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+                dag_run.notify_dagrun_state_changed()
+                duration = dag_run.end_date - dag_run.start_date
+                Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", 
duration)
+                Stats.timing("dagrun.duration.failed", duration, 
tags={"dag_id": dag_run.dag_id})
+
+                s.set_attribute("error", True)
+                s.add_event(
+                    name="error",
+                    attributes={
+                        "message": f"Run {dag_run.run_id} of {dag_run.dag_id} 
has timed-out",
+                        "duration": str(duration),
+                    },
+                )
+                return callback_to_execute
 
-        if not dag or not dag_model:
-            self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
-            return callback
+            if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
+                self.log.error("Execution date is in future: %s", 
dag_run.execution_date)
+                return callback
 
-        if (
-            dag_run.start_date
-            and dag.dagrun_timeout
-            and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
-        ):
-            dag_run.set_state(DagRunState.FAILED)
-            unfinished_task_instances = session.scalars(
-                select(TI)
-                .where(TI.dag_id == dag_run.dag_id)
-                .where(TI.run_id == dag_run.run_id)
-                .where(TI.state.in_(State.unfinished))
-            )
-            for task_instance in unfinished_task_instances:
-                task_instance.state = TaskInstanceState.SKIPPED
-                session.merge(task_instance)
-            session.flush()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+            if not self._verify_integrity_if_dag_changed(dag_run=dag_run, 
session=session):
+                self.log.warning(
+                    "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
+                )
+                return callback
+            # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something 
else?
+            schedulable_tis, callback_to_run = 
dag_run.update_state(session=session, execute_callbacks=False)
 
             if self._should_update_dag_next_dagruns(dag, dag_model, 
last_dag_run=dag_run, session=session):
                 dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
-
-            callback_to_execute = DagCallbackRequest(
-                full_filepath=dag.fileloc,
-                dag_id=dag.dag_id,
-                run_id=dag_run.run_id,
-                is_failure_callback=True,
-                processor_subdir=dag_model.processor_subdir,
-                msg="timed_out",
+            # This will do one query per dag run. We "could" build up a complex
+            # query to update all the TIs across all the execution dates and 
dag
+            # IDs in a single query, but it turns out that can be _very very 
slow_
+            # see #11147/commit ee90807ac for more details
+            _schedulable_ti_ids = []
+            for _ti in schedulable_tis:
+                _schedulable_ti_ids.append(_ti.task_id)

Review Comment:
   Something like this is a nice way for me to be more elegant in my expression 
of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to