yuqian90 commented on a change in pull request #8867:
URL: https://github.com/apache/airflow/pull/8867#discussion_r427012430
##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -131,6 +134,19 @@ def execute(self, context: Dict) -> Any:
try_number += 1
self.log.info("Success criteria met. Exiting.")
+ def _timeout_task(self, context: Dict) -> None:
+ # If sensor is in soft fail mode but will be retried then
+ # give it a chance and fail with timeout.
+ # This gives the ability to set up non-blocking AND soft-fail sensors.
+ if self.soft_fail and not context['ti'].is_eligible_to_retry():
+ if self.downstream_skip:
+ self._do_skip_downstream_tasks(context)
+ raise AirflowSkipException('Snap. Time is OUT.')
+ else:
+ if self.downstream_skip and not
context['ti'].is_eligible_to_retry():
+ self._do_skip_downstream_tasks(context)
+ raise AirflowSensorTimeout('Snap. Time is OUT.')
Review comment:
This logic does not look right. It is changing the original behaviour
even when user sets `downstream_skip=True`. We should not skip downstream tasks
and then raise `AirflowSensorTimeout`. I can't think of any scenario that will
be a useful behaviour. The original behaviour is to just raise
`AirflowSensorTimeout`.
##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -70,6 +72,7 @@ def __init__(self,
timeout: float = 60 * 60 * 24 * 7,
soft_fail: bool = False,
mode: str = 'poke',
+ downstream_skip=None,
Review comment:
Why is the default None? If you want the default to be the original
behaviour before this PR, it should default to True and handled accordingly in
`_timeout_task()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]