dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037
##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str,
Any]:
raise AirflowException("Can't load operators")
return {link.name: link for link in
plugins_manager.global_operator_extra_links}
+ def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+ """
+ Will raise :class:`~.AirflowSkipException` if dag run is not the
latest dag run,
+ with the following exceptions:
+ - the operator's ``latest_only`` parameter is not set to ``True``
+ - the context dictionary has no dag run
+ - the context dictionary has no dag
+ - the ``context`` dictionary is ``None`` or empty
+ - the dag run is externally triggered
+ """
+ if not (self.latest_only is True and context):
+ return
+
+ import pendulum
+
+ from airflow.exceptions import AirflowSkipException
+
+ dag_run = context.get('dag_run')
+ if not dag_run:
+ return
+
+ if dag_run and dag_run.external_trigger:
+ self.log.info("Externally triggered DAG_Run: allowing execution to
proceed.")
+ return
+
+ dag = context.get('dag')
+ if not dag:
+ return
+
+ now = pendulum.now('UTC')
+ left_window = dag.following_schedule(context['execution_date'])
+ right_window = dag.following_schedule(left_window)
+ self.log.info( # pylint: disable=logging-fstring-interpolation
+ f"Checking latest only:\n"
+ f"\tleft_window: {left_window}\n"
+ f"\tright_window: {right_window}\n"
+ f"\tnow: {now}\n",
+ )
+
+ if not left_window < now <= right_window:
+ raise AirflowSkipException('Not latest execution; skipping...')
+
@prepare_lineage
def pre_execute(self, context: Any):
"""This hook is triggered right before self.execute() is called."""
+ self._skip_if_not_latest(context)
Review comment:
I thought about doing something like that.
Concerning `pre_execute`, with any non-abstract method the user can call
`super().pre_execute`.
But I have no issue moving this out of pre_execute if that's what folks
think is best.
So as you've suggested, we can move it to `ti.
_prepare_and_execute_task_with_callbacks`.
Do you think we should keep the skip logic (i.e. function
`_skip_if_not_latest`) on base operator or move this to TI?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]