ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r515934876
##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,59 @@ def _run_raw_task(
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
+
session.commit()
+ if conf.getboolean('scheduler', 'schedule_after_task_execution',
fallback=True):
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ execution_date=self.execution_date,
+ )).one()
+
+ # Get a partial dag with just the specific tasks we want to
+ # examine. In order for dep checks to work correctly, we
+ # include ourself (so TriggerRuleDep can check the state of the
+ # task we just executed)
+ tasks = self.task.downstream_list + [self.task]
+ task_ids = [t.task_id for t in tasks]
+
+ partial_dag = self.task.dag.partial_subset(
+ task_ids,
+ include_downstream=True,
Review comment:
Hmmmm, this is going to include _all_ downstream tasks, recursively, not
just the tasks we've selected.
##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
+
session.commit()
+ if conf.getboolean('scheduler', 'schedule_after_task_execution',
fallback=True):
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ execution_date=self.execution_date,
+ )).one()
+
+ # Get a partial dag with just the specific tasks we want to
+ # examine. In order for dep checks to work correctly, we
+ # include ourself (so TriggerRuleDep can check the state of the
+ # task we just executed)
+ partial_dag = self.task.dag.partial_subset(
+ self.task.downstream_task_ids,
+ include_downstream=False,
+ include_upstream=False,
+ include_direct_upstream=True,
+ )
+
+ dag_run.dag = partial_dag
+ info = dag_run.task_instance_scheduling_decisions(session)
+
+ skippable_task_ids = {
+ task_id
+ for task_id in partial_dag.task_ids
+ if task_id not in self.task.downstream_task_ids
+ }
+
+ schedulable_tis = [
+ ti for ti in info.schedulable_tis if ti.task_id not in
skippable_task_ids
+ ]
+ for schedulable_ti in schedulable_tis:
+ if not hasattr(schedulable_ti, "task"):
+ schedulable_ti.task =
self.task.dag.get_task(schedulable_ti.task_id)
+
+ num = dag_run.schedule_tis(schedulable_tis)
+ self.log.info("%d downstream tasks scheduled from follow-on
schedule check", num)
+
+ session.commit()
+ except OperationalError:
+ # Any kind of DB error here is _non fatal_ as this block is
just an optimisation.
+ self.log.info("DB error when checking downstream tasks
ignored", exc_info=True)
Review comment:
I think it depends what the exception was
##########
File path: airflow/models/dagrun.py
##########
@@ -638,3 +674,52 @@ def get_latest_runs(cls, session=None):
.all()
)
return dagruns
+
+ @provide_session
+ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session =
None) -> int:
+ """
+ Set the given task instances in to the scheduled state.
+
+ Each element of ``schedulable_tis`` should have it's ``task``
attribute already set.
+
+ Any DummyOperator without callbacks is instead set straight to the
success state.
+
+ All the TIs should belong to this DagRun, but this code is in the
hot-path, this is not checked -- it
+ is the caller's responsibility to call this function only with TIs
from a single dag run.
+ """
+ # Get list of TIs that do not need to executed, these are
+ # tasks using DummyOperator and without on_execute_callback /
on_success_callback
+ dummy_tis = {
+ ti for ti in schedulable_tis
+ if
+ (
+ ti.task.task_type == "DummyOperator"
Review comment:
No, there is already an issue for that though
##########
File path: airflow/models/dagrun.py
##########
@@ -43,6 +43,20 @@
from airflow.utils.types import DagRunType
+class TISchedulingDecision(NamedTuple):
+ """
+ Type of return for DagRun.task_instance_scheduling_decisions
+
+ This is only used by type checkers, at run time this is a plain dict.
Review comment:
No, not any more. Good catch
##########
File path: airflow/models/dagrun.py
##########
@@ -638,3 +674,52 @@ def get_latest_runs(cls, session=None):
.all()
)
return dagruns
+
+ @provide_session
+ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session =
None) -> int:
+ """
+ Set the given task instances in to the scheduled state.
+
+ Each element of ``schedulable_tis`` should have it's ``task``
attribute already set.
+
+ Any DummyOperator without callbacks is instead set straight to the
success state.
+
+ All the TIs should belong to this DagRun, but this code is in the
hot-path, this is not checked -- it
+ is the caller's responsibility to call this function only with TIs
from a single dag run.
+ """
+ # Get list of TIs that do not need to executed, these are
+ # tasks using DummyOperator and without on_execute_callback /
on_success_callback
+ dummy_tis = {
+ ti for ti in schedulable_tis
+ if
+ (
+ ti.task.task_type == "DummyOperator"
Review comment:
#11393
##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
+
session.commit()
+ if conf.getboolean('scheduler', 'schedule_after_task_execution',
fallback=True):
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ execution_date=self.execution_date,
+ )).one()
+
+ # Get a partial dag with just the specific tasks we want to
+ # examine. In order for dep checks to work correctly, we
+ # include ourself (so TriggerRuleDep can check the state of the
+ # task we just executed)
+ partial_dag = self.task.dag.partial_subset(
+ self.task.downstream_task_ids,
+ include_downstream=False,
+ include_upstream=False,
+ include_direct_upstream=True,
+ )
+
+ dag_run.dag = partial_dag
+ info = dag_run.task_instance_scheduling_decisions(session)
+
+ skippable_task_ids = {
+ task_id
+ for task_id in partial_dag.task_ids
+ if task_id not in self.task.downstream_task_ids
+ }
+
+ schedulable_tis = [
+ ti for ti in info.schedulable_tis if ti.task_id not in
skippable_task_ids
+ ]
+ for schedulable_ti in schedulable_tis:
+ if not hasattr(schedulable_ti, "task"):
+ schedulable_ti.task =
self.task.dag.get_task(schedulable_ti.task_id)
+
+ num = dag_run.schedule_tis(schedulable_tis)
+ self.log.info("%d downstream tasks scheduled from follow-on
schedule check", num)
+
+ session.commit()
+ except OperationalError:
+ # Any kind of DB error here is _non fatal_ as this block is
just an optimisation.
+ self.log.info("DB error when checking downstream tasks
ignored", exc_info=True)
Review comment:
The most likely case I expect here is a "cannot reach DB" network error.
But yeah, I like your message better.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]