matthewblock commented on code in PR #27506:
URL: https://github.com/apache/airflow/pull/27506#discussion_r1014272829
##########
airflow/models/taskinstance.py:
##########
@@ -2557,6 +2557,65 @@ def ti_selector_condition(cls, vals: Collection[str |
tuple[str, int]]) -> Colum
return filters[0]
return or_(*filters)
+ @Sentry.enrich_errors
+ @provide_session
+ def schedule_downstream_tasks(self, session=None):
+ """
+ This mimics the local task job's mini scheduler, and it's only for
testing
+ :meta: private
+ """
+ from sqlalchemy.exc import OperationalError
+
+ from airflow.models import DagRun
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
+
+ task = self.task
+ if TYPE_CHECKING:
+ assert task.dag
+
+ # Get a partial DAG with just the specific tasks we want to
examine.
+ # In order for dep checks to work correctly, we include ourself (so
+ # TriggerRuleDep can check the state of the task we just executed).
+ partial_dag = task.dag.partial_subset(
+ task.downstream_task_ids,
+ include_downstream=True,
+ include_upstream=False,
+ include_direct_upstream=True,
+ )
+
+ dag_run.dag = partial_dag
+ info = dag_run.task_instance_scheduling_decisions(session)
+
+ skippable_task_ids = {
+ task_id for task_id in partial_dag.task_ids if task_id not in
task.downstream_task_ids
+ }
+
+ schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id
not in skippable_task_ids]
+ for schedulable_ti in schedulable_tis:
+ if not hasattr(schedulable_ti, "task"):
+ schedulable_ti.task =
task.dag.get_task(schedulable_ti.task_id)
+
+ num = dag_run.schedule_tis(schedulable_tis)
+ self.log.info("%d downstream tasks scheduled from follow-on
schedule check", num)
+
+ except OperationalError as e:
+ # Any kind of DB error here is _non fatal_ as this block is just
an optimisation.
+ self.log.info(
Review Comment:
Same here ([previous
comment](https://github.com/apache/airflow/pull/27506/files#r1014270814)) - if
this is logging more info about an exception, can it be an `ERROR` or `WARNING`
level log?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]