Shadowsong27 commented on issue #10194:
URL: https://github.com/apache/airflow/issues/10194#issuecomment-673933756
I have just hacked some workaround since I really need this feature, it may
be a bit raw but it's usable, using the first approach I mentioned above, with
some abstractions. Basically I create a function that takes in any Airflow
Operator, and returned a children of it, opening up an additional argument to
skip dates.
Code here:
```py
def flexify_operator(operator: Any) -> Any:
class FlexibleOperator(operator):
@apply_defaults
def __init__(
self,
allowed_dates: Tuple = (),
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.allowed_dates = allowed_dates
def execute(self, context):
execution_date = context.get('ds')
if self.allowed_dates:
if self.is_skipping_task(execution_date):
self.log.warning('Skipping the task since the date is
not found in the allowed dates. ')
return
# no allowed dates passed, or not skippingtreat as normal
operator
super().execute(context)
def is_skipping_task(self, execution_date: str) -> bool:
date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')
if date_obj.day not in self.allowed_dates:
return True
return False
return FlexibleOperator
```
usage in DAG file:
```py
from airflow.operators.postgres_operator import PostgresOperator
PostgresOperator = utils.flexify_operator(PostgresOperator)
PostgresOperator(
task_id=f"test",
postgres_conn_id="conn_id",
sql="SELECT * FROM public.test_demo",
dag=dag,
allowed_dates=(12, 13, 14)
)
```
So apparently this works only at day granularity, but it can be easily
hacked to fulfill your needs, hope this helps.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]