ashb commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r651557538
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1344,7 +1344,6 @@ def _run_scheduler_loop(self) -> None:
conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval',
fallback=15.0),
self._clean_tis_without_dagrun,
)
-
for loop_count in itertools.count(start=1):
Review comment:
```suggestion
for loop_count in itertools.count(start=1):
```
(Just so we avoid touching this file)
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None)
-> None:
exc_info=True,
)
session.rollback()
+
+ @provide_session
+ def _update_dagrun_state_for_paused_dag(self, session=None):
+ """
+ Checks for paused dags with DagRuns in the running state and
+ update the DagRun state if possible
+ """
+ dag_run = self.task_instance.get_dagrun()
+ if dag_run:
+ dag = dag_run.dag = self.task_instance.task.dag
+ if dag.get_is_paused():
+ dag_run.update_state(session=session, execute_callbacks=False)
Review comment:
If we say `execute_callbacks=False` here, then this means that the
return value will contain a list of callbacks -- so I think we want True here.
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None)
-> None:
exc_info=True,
)
session.rollback()
+
+ @provide_session
+ def _update_dagrun_state_for_paused_dag(self, session=None):
+ """
+ Checks for paused dags with DagRuns in the running state and
+ update the DagRun state if possible
+ """
+ dag_run = self.task_instance.get_dagrun()
Review comment:
```suggestion
dag_run = self.task_instance.get_dagrun(session=session)
```
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None)
-> None:
exc_info=True,
)
session.rollback()
+
+ @provide_session
+ def _update_dagrun_state_for_paused_dag(self, session=None):
+ """
+ Checks for paused dags with DagRuns in the running state and
+ update the DagRun state if possible
+ """
+ dag_run = self.task_instance.get_dagrun()
+ if dag_run:
+ dag = dag_run.dag = self.task_instance.task.dag
+ if dag.get_is_paused():
Review comment:
```suggestion
if dag.get_is_paused(session=session):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]