darkag commented on issue #61497:
URL: https://github.com/apache/airflow/issues/61497#issuecomment-3854771440

   sure, here is the timetable code
   ```python
   # plugins/special_days_timetable.py
   from datetime import timedelta
   from airflow.sdk import Variable
   import pendulum
   from airflow.plugins_manager import AirflowPlugin
   from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, 
TimeRestriction
   from pendulum import DateTime
   from typing import Optional
   
   class SpecialDaysTimetable(Timetable):
   
       def __init__(self, tz: str = "Europe/Paris"):
           self._tz = pendulum.timezone(tz)
           self.is_special_date = Variable.get("is_special", "False") == "True"
   
       @property
       def summary(self) -> str:
           return f"Special date"
   
       # --- Timetable API ---
       def infer_manual_data_interval(self, run_after: DateTime) -> 
DataInterval:
           # intervalle de 30 minutes se terminant à run_after
           start = run_after.in_timezone(self._tz) - timedelta(minutes=30)
           return DataInterval(start=start, end=run_after.in_timezone(self._tz))
   
       def next_dagrun_info(
           self,
           *,
           last_automated_data_interval: Optional[DataInterval],
           restriction: TimeRestriction,
       ) -> Optional[DagRunInfo]:
           return None        
   
   
   class SpecialDaysTimetablePlugin(AirflowPlugin):
       name = "special_days_timetable_plugin"
       timetables = [SpecialDaysTimetable]
   
   ```
   
   and the dag using it
   
   ```
   import pendulum
   from datetime import timedelta, datetime
   from airflow.decorators import dag,task
   from special_days_timetable import SpecialDaysTimetable
   
   local_tz = pendulum.timezone("Europe/Paris")
   
   default_args = {
       'owner': 'cineentrees',
       'depends_on_past': False,
       'email_on_failure': True,
       'email_on_retry': False,
       'retries': 0,
       'retry_delay': timedelta(minutes=1),
   }
   
   @dag(
       dag_id="test_of_special_days",
       default_args=default_args,
       schedule=SpecialDaysTimetable(),
       start_date=datetime(2023,1,1, 4, tzinfo=local_tz),
       max_active_runs=1,
       catchup=False,
       dagrun_timeout=timedelta(minutes=10),
   )
   def run_dag():
   
       @task()
       def first_task():
           pass
       
       first_task()
   
   run_dag()
   ```
   
   everything is ok if I replace `self.is_special_date = 
Variable.get("is_special", "False") == "True"` by `self.is_special_date = False`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to