dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037
##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str,
Any]:
raise AirflowException("Can't load operators")
return {link.name: link for link in
plugins_manager.global_operator_extra_links}
+ def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+ """
+ Will raise :class:`~.AirflowSkipException` if dag run is not the
latest dag run,
+ with the following exceptions:
+ - the operator's ``latest_only`` parameter is not set to ``True``
+ - the context dictionary has no dag run
+ - the context dictionary has no dag
+ - the ``context`` dictionary is ``None`` or empty
+ - the dag run is externally triggered
+ """
+ if not (self.latest_only is True and context):
+ return
+
+ import pendulum
+
+ from airflow.exceptions import AirflowSkipException
+
+ dag_run = context.get('dag_run')
+ if not dag_run:
+ return
+
+ if dag_run and dag_run.external_trigger:
+ self.log.info("Externally triggered DAG_Run: allowing execution to
proceed.")
+ return
+
+ dag = context.get('dag')
+ if not dag:
+ return
+
+ now = pendulum.now('UTC')
+ left_window = dag.following_schedule(context['execution_date'])
+ right_window = dag.following_schedule(left_window)
+ self.log.info( # pylint: disable=logging-fstring-interpolation
+ f"Checking latest only:\n"
+ f"\tleft_window: {left_window}\n"
+ f"\tright_window: {right_window}\n"
+ f"\tnow: {now}\n",
+ )
+
+ if not left_window < now <= right_window:
+ raise AirflowSkipException('Not latest execution; skipping...')
+
@prepare_lineage
def pre_execute(self, context: Any):
"""This hook is triggered right before self.execute() is called."""
+ self._skip_if_not_latest(context)
Review comment:
I thought about doing something like that.
One thought is pre_execute has the `@prepare_lineage` decorator so people
should probably call `super().pre_execute(context)` anyway
Of course usage of lineage is probably not that common but calling super is
probably "the right way"?
In any case i have no issue moving this out of pre_execute.
There are two questions to answer: (1) where to locate the skip logic, and
(2) where to call it from
Re (2) You are proposing calling from ti.
_prepare_and_execute_task_with_callbacks. I'm ok with that.
Re (1) do you think we should keep the skip logic (i.e. function
`_skip_if_not_latest`) on base operator? or also move to TI? I guess TI makes
sense cus it's more a scheduling concern than operator functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]