darkag commented on issue #61497:
URL: https://github.com/apache/airflow/issues/61497#issuecomment-3854771440
sure, here is the timetable code
```python
# plugins/special_days_timetable.py
from datetime import timedelta
from airflow.sdk import Variable
import pendulum
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval,
TimeRestriction
from pendulum import DateTime
from typing import Optional
class SpecialDaysTimetable(Timetable):
def __init__(self, tz: str = "Europe/Paris"):
self._tz = pendulum.timezone(tz)
self.is_special_date = Variable.get("is_special", "False") == "True"
@property
def summary(self) -> str:
return f"Special date"
# --- Timetable API ---
def infer_manual_data_interval(self, run_after: DateTime) ->
DataInterval:
# intervalle de 30 minutes se terminant à run_after
start = run_after.in_timezone(self._tz) - timedelta(minutes=30)
return DataInterval(start=start, end=run_after.in_timezone(self._tz))
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
return None
class SpecialDaysTimetablePlugin(AirflowPlugin):
name = "special_days_timetable_plugin"
timetables = [SpecialDaysTimetable]
```
and the dag using it
```
import pendulum
from datetime import timedelta, datetime
from airflow.decorators import dag,task
from special_days_timetable import SpecialDaysTimetable
local_tz = pendulum.timezone("Europe/Paris")
default_args = {
'owner': 'cineentrees',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
@dag(
dag_id="test_of_special_days",
default_args=default_args,
schedule=SpecialDaysTimetable(),
start_date=datetime(2023,1,1, 4, tzinfo=local_tz),
max_active_runs=1,
catchup=False,
dagrun_timeout=timedelta(minutes=10),
)
def run_dag():
@task()
def first_task():
pass
first_task()
run_dag()
```
everything is ok if I replace `self.is_special_date =
Variable.get("is_special", "False") == "True"` by `self.is_special_date = False`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]