dstandish commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613488037



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, 
Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the 
latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to 
proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       I thought about doing something like that.
   
   One thought is pre_execute has the `@prepare_lineage` decorator so people 
should probably call `super().pre_execute(context)` anyway 
   
   Of course usage of lineage is probably not that common but calling super is 
probably "the right way"?
   
   In any case i have no issue moving this out of pre_execute.
   
   There are two questions to answer: (1) where to locate the skip logic, and 
(2) where to call it from
   
   Re (2) You are proposing calling from ti. 
_prepare_and_execute_task_with_callbacks.  I'm ok with that.
   
   Re (1) do you think we should keep the skip logic (i.e. function 
`_skip_if_not_latest`) on base operator? or also move to TI?  I guess TI makes 
sense cus it's more a scheduling concern than operator functionality.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to