turbaszek commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r501236528
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -567,302 +567,71 @@ def update_import_errors(session: Session, dagbag:
DagBag) -> None:
stacktrace=stacktrace))
session.commit()
- # pylint: disable=too-many-return-statements,too-many-branches
@provide_session
- def create_dag_run(
- self,
- dag: DAG,
- dag_runs: Optional[List[DagRun]] = None,
- session: Session = None,
- ) -> Optional[DagRun]:
- """
- This method checks whether a new DagRun needs to be created
- for a DAG based on scheduling interval.
- Returns DagRun if one is scheduled. Otherwise returns None.
- """
- # pylint: disable=too-many-nested-blocks
- if not dag.schedule_interval:
- return None
-
- active_runs: List[DagRun]
- if dag_runs is None:
- active_runs = DagRun.find(
- dag_id=dag.dag_id,
- state=State.RUNNING,
- external_trigger=False,
- session=session
- )
- else:
- active_runs = [
- dag_run
- for dag_run in dag_runs
- if not dag_run.external_trigger
- ]
- # return if already reached maximum active runs and no timeout setting
- if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
- return None
- timed_out_runs = 0
- for dr in active_runs:
- if (
- dr.start_date and dag.dagrun_timeout and
- dr.start_date < timezone.utcnow() - dag.dagrun_timeout
- ):
- dr.state = State.FAILED
- dr.end_date = timezone.utcnow()
- dag.handle_callback(dr, success=False, reason='dagrun_timeout',
- session=session)
- timed_out_runs += 1
- session.commit()
- if len(active_runs) - timed_out_runs >= dag.max_active_runs:
- return None
-
- # this query should be replaced by find dagrun
- last_scheduled_run: Optional[datetime.datetime] = (
- session.query(func.max(DagRun.execution_date))
- .filter_by(dag_id=dag.dag_id)
- .filter(or_(
- DagRun.external_trigger == False, # noqa: E712 pylint:
disable=singleton-comparison
- DagRun.run_type == DagRunType.SCHEDULED.value
- )).scalar()
- )
-
- # don't schedule @once again
- if dag.schedule_interval == '@once' and last_scheduled_run:
- return None
-
- # don't do scheduler catchup for dag's that don't have dag.catchup =
True
- if not (dag.catchup or dag.schedule_interval == '@once'):
- # The logic is that we move start_date up until
- # one period before, so that timezone.utcnow() is AFTER
- # the period end, and the job can be created...
- now = timezone.utcnow()
- next_start = dag.following_schedule(now)
- last_start = dag.previous_schedule(now)
- if next_start <= now or isinstance(dag.schedule_interval,
timedelta):
- new_start = last_start
- else:
- new_start = dag.previous_schedule(last_start)
-
- if dag.start_date:
- if new_start >= dag.start_date:
- dag.start_date = new_start
- else:
- dag.start_date = new_start
-
- next_run_date = None
- if not last_scheduled_run:
- # First run
- task_start_dates = [t.start_date for t in dag.tasks]
- if task_start_dates:
- next_run_date = dag.normalize_schedule(min(task_start_dates))
- self.log.debug(
- "Next run date based on tasks %s",
- next_run_date
- )
- else:
- next_run_date = dag.following_schedule(last_scheduled_run)
-
- # make sure backfills are also considered
- last_run = dag.get_last_dagrun(session=session)
- if last_run and next_run_date:
- while next_run_date <= last_run.execution_date:
- next_run_date = dag.following_schedule(next_run_date)
-
- # don't ever schedule prior to the dag's start_date
- if dag.start_date:
- next_run_date = (dag.start_date if not next_run_date
- else max(next_run_date, dag.start_date))
- if next_run_date == dag.start_date:
- next_run_date = dag.normalize_schedule(dag.start_date)
-
- self.log.debug(
- "Dag start date: %s. Next run date: %s",
- dag.start_date, next_run_date
- )
-
- # don't ever schedule in the future or if next_run_date is None
- if not next_run_date or next_run_date > timezone.utcnow():
- return None
-
- # this structure is necessary to avoid a TypeError from concatenating
- # NoneType
- period_end = None
- if dag.schedule_interval == '@once':
- period_end = next_run_date
- elif next_run_date:
- period_end = dag.following_schedule(next_run_date)
-
- # Don't schedule a dag beyond its end_date (as specified by the dag
param)
- if next_run_date and dag.end_date and next_run_date > dag.end_date:
- return None
-
- # Don't schedule a dag beyond its end_date (as specified by the task
params)
- # Get the min task end date, which may come from the dag.default_args
- min_task_end_date = min([t.end_date for t in dag.tasks if t.end_date],
default=None)
- if next_run_date and min_task_end_date and next_run_date >
min_task_end_date:
- return None
-
- if next_run_date and period_end and period_end <= timezone.utcnow():
- next_run = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- execution_date=next_run_date,
- start_date=timezone.utcnow(),
- state=State.RUNNING,
- external_trigger=False
- )
- return next_run
-
- return None
-
- @provide_session
- def _process_task_instances(
- self, dag: DAG, dag_runs: List[DagRun], session: Session = None
- ) -> List[TaskInstanceKey]:
- """
- This method schedules the tasks for a single DAG by looking at the
- active DAG runs and adding task instances that should run to the
- queue.
- """
- # update the state of the previously active dag runs
- active_dag_runs = 0
- task_instances_list = []
- for run in dag_runs:
- self.log.info("Examining DAG run %s", run)
- # don't consider runs that are executed in the future unless
- # specified by config and schedule_interval is None
- if run.execution_date > timezone.utcnow() and not
dag.allow_future_exec_dates:
- self.log.error(
- "Execution date is in future: %s",
- run.execution_date
- )
- continue
-
- if active_dag_runs >= dag.max_active_runs:
- self.log.info("Number of active dag runs reached
max_active_run.")
- break
-
- # skip backfill dagruns for now as long as they are not really
scheduled
- if run.is_backfill:
- continue
-
- # todo: run.dag is transient but needs to be set
- run.dag = dag # type: ignore
- # todo: preferably the integrity check happens at dag collection
time
- run.verify_integrity(session=session)
- ready_tis = run.update_state(session=session)
- if run.state == State.RUNNING:
- active_dag_runs += 1
- self.log.debug("Examining active DAG run: %s", run)
- for ti in ready_tis:
- self.log.debug('Queuing task: %s', ti)
- task_instances_list.append(ti.key)
- return task_instances_list
-
- @provide_session
- def _process_dags(self, dags: List[DAG], session: Session = None) ->
List[TaskInstanceKey]:
- """
- Iterates over the dags and processes them. Processing includes:
-
- 1. Create appropriate DagRun(s) in the DB.
- 2. Create appropriate TaskInstance(s) in the DB.
- 3. Send emails for tasks that have missed SLAs (if CHECK_SLAS config
enabled).
-
- :param dags: the DAGs from the DagBag to process
- :type dags: List[airflow.models.DAG]
- :rtype: list[TaskInstance]
- :return: A list of generated TaskInstance objects
- """
- check_slas: bool = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
- use_job_schedule: bool = conf.getboolean('scheduler',
'USE_JOB_SCHEDULE')
-
- # pylint: disable=too-many-nested-blocks
- tis_out: List[TaskInstanceKey] = []
- dag_ids: List[str] = [dag.dag_id for dag in dags]
- dag_runs = DagRun.find(dag_id=dag_ids, state=State.RUNNING,
session=session)
- # As per the docs of groupby
(https://docs.python.org/3/library/itertools.html#itertools.groupby)
- # we need to use `list()` otherwise the result will be wrong/incomplete
- dag_runs_by_dag_id: Dict[str, List[DagRun]] = {
- k: list(v) for k, v in groupby(dag_runs, lambda d: d.dag_id)
- }
-
- for dag in dags:
- dag_id: str = dag.dag_id
- self.log.info("Processing %s", dag_id)
- dag_runs_for_dag = dag_runs_by_dag_id.get(dag_id) or []
-
- # Only creates DagRun for DAGs that are not subdag since
- # DagRun of subdags are created when SubDagOperator executes.
- if not dag.is_subdag and use_job_schedule:
- dag_run = self.create_dag_run(dag, dag_runs=dag_runs_for_dag)
- if dag_run:
- dag_runs_for_dag.append(dag_run)
- expected_start_date =
dag.following_schedule(dag_run.execution_date)
- if expected_start_date:
- schedule_delay = dag_run.start_date -
expected_start_date
- Stats.timing(
-
'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
- schedule_delay)
- self.log.info("Created %s", dag_run)
-
- if dag_runs_for_dag:
- tis_out.extend(self._process_task_instances(dag,
dag_runs_for_dag))
- if check_slas:
- self.manage_slas(dag)
-
- return tis_out
-
- def _find_dags_to_process(self, dags: List[DAG]) -> List[DAG]:
- """
- Find the DAGs that are not paused to process.
-
- :param dags: specified DAGs
- :return: DAGs to process
- """
- if self.dag_ids:
- dags = [dag for dag in dags
- if dag.dag_id in self.dag_ids]
- return dags
-
- @provide_session
- def execute_on_failure_callbacks(
+ def execute_callbacks(
self,
dagbag: DagBag,
- failure_callback_requests: List[FailureCallbackRequest],
+ callback_requests: List[CallbackRequest],
session: Session = None
) -> None:
"""
Execute on failure callbacks. These objects can come from SchedulerJob
or from
DagFileProcessorManager.
- :param failure_callback_requests: failure callbacks to execute
- :type failure_callback_requests:
List[airflow.utils.dag_processing.FailureCallbackRequest]
+ :param dagbag: Dag Bag of dags
+ :param callback_requests: failure callbacks to execute
+ :type callback_requests:
List[airflow.utils.callback_requests.CallbackRequest]
:param session: DB session.
"""
- for request in failure_callback_requests:
- simple_ti = request.simple_task_instance
- if simple_ti.dag_id in dagbag.dags:
- dag = dagbag.dags[simple_ti.dag_id]
- if simple_ti.task_id in dag.task_ids:
- task = dag.get_task(simple_ti.task_id)
- ti = TI(task, simple_ti.execution_date)
- # Get properties needed for failure handling from
SimpleTaskInstance.
- ti.start_date = simple_ti.start_date
- ti.end_date = simple_ti.end_date
- ti.try_number = simple_ti.try_number
- ti.state = simple_ti.state
- ti.test_mode = self.UNIT_TEST_MODE
+ for request in callback_requests:
+ try:
+ if isinstance(request, TaskCallbackRequest):
+ self._execute_task_callbacks(dagbag, request)
+ elif isinstance(request, SlaCallbackRequest):
+ self.manage_slas(dagbag.dags.get(request.dag_id))
+ elif isinstance(request, DagCallbackRequest):
+ self._execute_dag_callbacks(dagbag, request, session)
+ except Exception: # pylint: disable=broad-except
+ self.log.exception("Error executing callback for File: %s",
request.full_filepath)
Review comment:
```suggestion
self.log.exception("Error executing %s callback for file:
%s", request.__class__.__name__, request.full_filepath)
```
Just to give more information
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]