wyatthomg opened a new issue #22242:
URL: https://github.com/apache/airflow/issues/22242


   ### Apache Airflow version
   
   2.2.4 (latest released)
   
   ### What happened
   
   When i change the tzinfo of Timetable example  from UTC to CST and set Dag's 
catchup =True。 Dag's backfill run just one time.
   But if i use UTC as  example. It will run as i need. How can i fix it
   
   ### What you expected to happen
   
   I need to Use tzinfo(CST) in timetable.
   
   ### How to reproduce
   
   This is the Timetable example and i need to change tzinfo to CST.
   But if i use CST(tz Asia/shanghai), my Dag's only backfill once.
   ```
   from datetime import timedelta
   from typing import Optional
   
   from pendulum import Date, DateTime, Time, timezone
   
   from airflow.plugins_manager import AirflowPlugin
   from airflow.timetables.base import DagRunInfo, DataInterval, 
TimeRestriction, Timetable
   
   UTC = timezone("UTC")
   CST = timezone("Asia/Shanghai")
   # CST = timezone("UTC") # If use it, all the thing is good. 
   
   #####same with example of doc unless "tzinfo"#######
   class AfterWorkdayTimetable(Timetable):
       def infer_manual_data_interval(self, run_after: DateTime) -> 
DataInterval:
           weekday = run_after.weekday()
           if weekday in (0, 6):  # Monday and Sunday -- interval is last 
Friday.
               days_since_friday = (run_after.weekday() - 4) % 7
               delta = timedelta(days=days_since_friday)
           else:  # Otherwise the interval is yesterday.
               delta = timedelta(days=1)
           start = DateTime.combine((run_after - delta).date(), 
Time.min).replace(tzinfo=CST)
           end = (start + timedelta(days=1).replace(tzinfo=CST))
           return DataInterval(start=start, end=end)
   
       def next_dagrun_info(
           self,
           *,
           last_automated_data_interval: Optional[DataInterval],
           restriction: TimeRestriction,
       ) -> Optional[DagRunInfo]:
           if last_automated_data_interval is not None:  # There was a previous 
run on the regular schedule.
               last_start = last_automated_data_interval.start
               last_start_weekday = last_start.weekday()
               if 0 <= last_start_weekday < 4:  # Last run on Monday through 
Thursday -- next is tomorrow.
                   delta = timedelta(days=1)
               else:  # Last run on Friday -- skip to next Monday.
                   delta = timedelta(days=(7 - last_start_weekday))
               next_start = DateTime.combine((last_start + delta).date(), 
Time.min).replace(tzinfo=CST)
           else:  # This is the first ever run on the regular schedule.
               next_start = restriction.earliest
               if next_start is None:  # No start_date. Don't schedule.
                   return None
               if not restriction.catchup:
                   # If the DAG has catchup=False, today is the earliest to 
consider.
                   next_start = max(next_start, DateTime.combine(Date.today(), 
Time.min).replace(tzinfo=CST))
               elif next_start.time() != Time.min:
                   # If earliest does not fall on midnight, skip to the next 
day.
                   next_day = next_start.date() + timedelta(days=1)
                   next_start = DateTime.combine(next_day, 
Time.min).replace(tzinfo=CST)
               next_start_weekday = next_start.weekday()
               if next_start_weekday in (5, 6):  # If next start is in the 
weekend, go to next Monday.
                   delta = timedelta(days=(7 - next_start_weekday))
                   next_start = next_start + delta
           if restriction.latest is not None and next_start > 
restriction.latest:
               return None  # Over the DAG's scheduled end; don't schedule.
           return DagRunInfo.interval(start=next_start, end=(next_start + 
timedelta(days=1)).replace(tzinfo=CST))
   
   
   class WorkdayTimetablePlugin(AirflowPlugin):
       name = "workday_timetable_plugin"
       timetables = [AfterWorkdayTimetable]
   
   
   ```
   
   
![1](https://user-images.githubusercontent.com/75414662/158155868-e14c0e9e-7fe9-4039-8502-076385e6fe6f.png)
   
   
   and scheduler.log :
   ```
   [2022-03-14 18:15:31,465] {dag.py:2937} INFO - Setting 
next_dagrun for extract_to_mdm_10 to 2022-02-02T00:00:00+08:00
   [2022-03-14 18:15:32,051] {dag.py:2937} INFO - Setting 
next_dagrun for extract_to_mdm_10 to 2022-02-02T00:00:00+08:00
   [2022-03-14 18:15:33,147] {dag.py:2937} INFO - Setting 
next_dagrun for extract_to_mdm_10 to 2022-02-02T00:00:00+08:00
   [2022-03-14 18:15:34,195] {dag.py:2937} INFO - Setting 
next_dagrun for extract_to_mdm_10 to 2022-02-02T00:00:00+08:00
   [2022-03-14 18:15:35,250] {dag.py:2937} INFO - Setting 
next_dagrun for extract_to_mdm_10 to 2022-02-02T00:00:00+08:00 ………………
   ```
   but if i use UTC 
   
![企业微信截图_16472545233476](https://user-images.githubusercontent.com/75414662/158156525-bad8ae58-30b6-4328-a90a-4b3a483be0f8.png)
   
   this is my Dag's code
   ```
   # -*- coding: utf-8 -*-
   
   from datetime import datetime ,timedelta
   import pendulum
   from airflow.operators.python_operator import PythonOperator
   
   from delayoneday import AfterWorkdayTimetable#Using CST and same with example
   from timetabledemo import demoTimetable#Using UTC and same with example
   from airflow import DAG
   
   local_tz = pendulum.timezone("Asia/Shanghai")
   
   default_args = {
       'owner': 'airflow',
       'start_date': 
pendulum.datetime(year=2022,month=2,day=1,hour=10,tz=local_tz),
       'end_date': 
pendulum.datetime(year=2099,month=2,day=1,hour=10,tz=local_tz),
       'retries': 3,
       'retry_delay': timedelta(minutes=5)
   }   
   
   dag3 = DAG(
       dag_id='extract_to_mdm_10',
       default_args=default_args,
       timetable=AfterWorkdayTimetable(),
       catchup= True
   )  
   
   
   dag4 = DAG(
       dag_id='extract_to_mdm_9',
       default_args=default_args,
       timetable=demoTimetable(),
       catchup= True
   )  
   
   
   def printtest(**kwargs):
       print(kwargs)
       execution_date=kwargs.get('logical_date').in_timezone(local_tz)
       print(execution_date)
   
   
   t4 = PythonOperator(task_id='pushtest4',  python_callable=printtest, 
provide_context=True,dag=dag3)
   
   t5 = PythonOperator(task_id='pushtest4',  python_callable=printtest, 
provide_context=True,dag=dag4)
   
   t4
   
   t5
   ```
   
   
   ### Operating System
   
   centos 7
   
   ### Versions of Apache Airflow Providers
   
   2.2.4
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   None
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to