Shadowsong27 edited a comment on issue #10194:
URL: https://github.com/apache/airflow/issues/10194#issuecomment-673933756


   I have just hacked some workaround since I really need this feature, it may 
be a bit raw but it's usable, using the first approach I mentioned above, with 
some abstractions. Basically I create a function that takes in any Airflow 
Operator, and returned a children of it, opening up an additional argument to 
skip dates. 
   
   Code here:
   
   ```py
   def flexify_operator(operator: Any) -> Any:
   
       class FlexibleOperator(operator):
   
           @apply_defaults
           def __init__(
                   self,
                   allowed_dates: Tuple = (),
                   skip_func: Callable = None,
                   *args,
                   **kwargs,
           ):
               super().__init__(*args, **kwargs)
   
               if allowed_dates and skip_func:
                   raise AirflowException("You can only provide either allowed 
dates or a custom skip function.")
   
               self.allowed_dates = allowed_dates
               self.skip_func = skip_func
   
           def execute(self, context):
               execution_date = context.get('ds')
               is_skipping = False
   
               if self.skip_func:
                   if self.skip_func(execution_date):
                       is_skipping = True
   
               if self.allowed_dates:
                   if self.is_skipping_task(execution_date):
                       is_skipping = True
   
               if is_skipping:
                   self.log.warning('Skipping the task based on defined logic ')
                   return
   
               self.log.warning("Executing actual tasks ... ")
               super().execute(context)
   
           def is_skipping_task(self, execution_date: str) -> bool:
   
               date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')
   
               if date_obj.day not in self.allowed_dates:
                   return True
   
               return False
   
       return FlexibleOperator
   
   
   def exec_tues_fri(execution_date: str) -> bool:
       """ skip if return True, this func execute on Tuesday and Friday """
       date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')
   
       if date_obj.weekday() not in [1, 4]:
           return True
   
       return False
   ```
   
   usage in DAG file:
   
   ```py
   from airflow.operators.postgres_operator import PostgresOperator
   PostgresOperator = utils.flexify_operator(PostgresOperator)
   
   PostgresOperator(
       task_id=f"test",
       postgres_conn_id="conn_id",
       sql="SELECT * FROM public.test_demo",
       dag=dag,
       allowed_dates=(12, 13, 14)
   )
   
   PostgresOperator(
       task_id=f"test2",
       postgres_conn_id="conn_id",
       sql="SELECT * FROM public.test_demo",
       dag=dag,
       skip_func=exec_tues_fri
   )
   ```
   
   Ok i have just updated this to be more generic, allowed either a tuple of 
dates to be executed, or a custom callable. 
   
   Note: if the callable returns True, it means it's going to skip. So when you 
want to execute on Friday, return False when `weekday` == 4. Well if you find 
this awkward you can change it.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to