This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a10fc9aefee0054effb2ccb24717d90bad53c6a9 Author: Igor <[email protected]> AuthorDate: Fri Sep 9 04:05:52 2022 +0200 Add more weekday operator and sensor examples #26071 (#26098) (cherry picked from commit dd6b2e4e6cb89d9eea2f3db790cb003a2e89aeff) --- .../example_branch_day_of_week_operator.py | 13 +++++++- airflow/operators/weekday.py | 39 +++++++++++++++++++++- airflow/sensors/weekday.py | 15 +++++++-- 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py index 62b0bc6ce9..76b524b932 100644 --- a/airflow/example_dags/example_branch_day_of_week_operator.py +++ b/airflow/example_dags/example_branch_day_of_week_operator.py @@ -24,6 +24,7 @@ import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.weekday import BranchDayOfWeekOperator +from airflow.utils.weekday import WeekDay with DAG( dag_id="example_weekday_branch_operator", @@ -35,6 +36,8 @@ with DAG( # [START howto_operator_day_of_week_branch] empty_task_1 = EmptyOperator(task_id='branch_true') empty_task_2 = EmptyOperator(task_id='branch_false') + empty_task_3 = EmptyOperator(task_id='branch_weekend') + empty_task_4 = EmptyOperator(task_id='branch_mid_week') branch = BranchDayOfWeekOperator( task_id="make_choice", @@ -42,7 +45,15 @@ with DAG( follow_task_ids_if_false="branch_false", week_day="Monday", ) + branch_weekend = BranchDayOfWeekOperator( + task_id="make_weekend_choice", + follow_task_ids_if_true="branch_weekend", + follow_task_ids_if_false="branch_mid_week", + week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, + ) - # Run empty_task_1 if branch executes on Monday + # Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise branch >> [empty_task_1, empty_task_2] + # Run empty_task_3 if it's a weekend, empty_task_4 otherwise + empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4] # [END howto_operator_day_of_week_branch] diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index 2d3aa0bda9..1bb8354d19 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -31,6 +31,40 @@ class BranchDayOfWeekOperator(BaseBranchOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BranchDayOfWeekOperator` + **Example** (with single day): :: + + from airflow.operators.empty import EmptyOperator + + monday = EmptyOperator(task_id='monday') + other_day = EmptyOperator(task_id='other_day') + + monday_check = DayOfWeekSensor( + task_id='monday_check', + week_day='Monday', + use_task_logical_date=True, + follow_task_ids_if_true='monday', + follow_task_ids_if_false='other_day', + dag=dag) + monday_check >> [monday, other_day] + + **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): :: + + # import WeekDay Enum + from airflow.utils.weekday import WeekDay + from airflow.operators.empty import EmptyOperator + + workday = EmptyOperator(task_id='workday') + weekend = EmptyOperator(task_id='weekend') + weekend_check = BranchDayOfWeekOperator( + task_id='weekend_check', + week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, + use_task_logical_date=True, + follow_task_ids_if_true='weekend', + follow_task_ids_if_false='workday', + dag=dag) + # add downstream dependencies as you would do with any branch operator + weekend_check >> [workday, weekend] + :param follow_task_ids_if_true: task id or task ids to follow if criteria met :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met :param week_day: Day of the week to check (full name). Optionally, a set @@ -42,9 +76,12 @@ class BranchDayOfWeekOperator(BaseBranchOperator): * ``{WeekDay.TUESDAY}`` * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` + To use `WeekDay` enum, import it from `airflow.utils.weekday` + :param use_task_logical_date: If ``True``, uses task's logical date to compare with is_today. Execution Date is Useful for backfilling. If ``False``, uses system's day of the week. + :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date` """ def __init__( @@ -52,7 +89,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator): *, follow_task_ids_if_true: Union[str, Iterable[str]], follow_task_ids_if_false: Union[str, Iterable[str]], - week_day: Union[str, Iterable[str]], + week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]], use_task_logical_date: bool = False, use_task_execution_day: bool = False, **kwargs, diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py index ec5abbb413..da0ba9591a 100644 --- a/airflow/sensors/weekday.py +++ b/airflow/sensors/weekday.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. import warnings +from typing import Iterable, Union from airflow.exceptions import RemovedInAirflow3Warning from airflow.sensors.base import BaseSensorOperator @@ -66,13 +67,23 @@ class DayOfWeekSensor(BaseSensorOperator): * ``{WeekDay.TUESDAY}`` * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` + To use ``WeekDay`` enum, import it from ``airflow.utils.weekday`` + :param use_task_logical_date: If ``True``, uses task's logical date to compare with week_day. Execution Date is Useful for backfilling. If ``False``, uses system's day of the week. Useful when you don't want to run anything on weekdays on the system. + :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date` """ - def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs): + def __init__( + self, + *, + week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]], + use_task_logical_date: bool = False, + use_task_execution_day: bool = False, + **kwargs, + ) -> None: super().__init__(**kwargs) self.week_day = week_day self.use_task_logical_date = use_task_logical_date @@ -85,7 +96,7 @@ class DayOfWeekSensor(BaseSensorOperator): ) self._week_day_num = WeekDay.validate_week_day(week_day) - def poke(self, context: Context): + def poke(self, context: Context) -> bool: self.log.info( 'Poking until weekday is in %s, Today is %s', self.week_day,
