AanchalAgrawal2105 opened a new issue, #49216:
URL: https://github.com/apache/airflow/issues/49216

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.3
   
   ### What happened?
   
   We are using Custom Timetable to catchup DAG from start date in a chunk of 
yearly, 6 months/ monthly and so one till a cut-off date then using cron to 
continue daily runs from cut-off date.
   When we try to  manually mark a DAG run from UI as failed/success , running 
task is still in green with no updates.
   When we try to mark the task directly as success/failed below occurs:
   
   <img width="851" alt="Image" 
src="https://github.com/user-attachments/assets/d7363d0f-e363-4e58-9bf5-91e12d215d76";
 />
   
   Webserver pod restarts with error like 
   ```
   [2025-04-14T10:35:21.297+0000] {main.py:73} INFO - [CustomTimetable] 
Switching to cron-based scheduling
   [2025-04-14T10:35:21.297+0000] {main.py:73} INFO - [CustomTimetable] 
Switching to cron-based scheduling
   [2025-04-14T10:35:21.298+0000] {main.py:75} INFO - [CustomTimetable] 
cron-based next_start: 2261-11-01 05:00:00+00:00, next_end: 2261-11-02 
05:00:00+00:00
   [2025-04-14T10:35:21.298+0000] {main.py:66} INFO - [CustomTimetable] 
last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 1, 
5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 2, 5, 0, 
tzinfo=Timezone('UTC')))
   [2025-04-14T10:35:21.298+0000] {main.py:73} INFO - [CustomTimetable] 
Switching to cron-based scheduling
   [2025-04-14T10:35:21.298+0000] {main.py:75} INFO - [CustomTimetable] 
cron-based next_start: 2261-11-02 05:00:00+00:00, next_end: 2261-11-03 
05:00:00+00:00
   [2025-04-14T10:35:21.299+0000] {main.py:66} INFO - [CustomTimetable] 
last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 2, 
5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 3, 5, 0, 
tzinfo=Timezone('UTC')))
   [2025-04-14T10:35:21.299+0000] {main.py:73} INFO - [CustomTimetable] 
Switching to cron-based scheduling
   [2025-04-14T10:35:21.299+0000] {main.py:75} INFO - [CustomTimetable] 
cron-based next_start: 2261-11-03 05:00:00+00:00, next_end: 2261-11-04 
05:00:00+00:00
   [2025-04-14T10:35:21.299+0000] {main.py:66} INFO - [CustomTimetable] 
last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 3, 
5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 4, 5, 0, 
tzinfo=Timezone('UTC')))
   [2025-04-14T10:35:21.299+0000] {main.py:73} INFO - [CustomTimetable] 
Switching to cron-based scheduling
   [2025-04-14T10:35:21.300+0000] {main.py:75} INFO - [CustomTimetable] 
cron-based next_start: 2261-11-04 05:00:00+00:00, next_end: 2261-11-05 
05:00:00+00:00
   [2025-04-14T10:35:21.300+0000] {main.py:66} INFO - [CustomTimetable] 
last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 4, 
5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 5, 5, 0, 
tzinfo=Timezone('UTC')))
   [2025-04-14 10:31:44 +0000] [48] [ERROR] Error handling request 
/confirm?dag_id=slack-commander-standardized.pipeline-question.question_created_timetable.standardized&dag_run_id=scheduled__2025-03-17T00%3A00%3A00%2B00%3A00&past=false&future=false&upstream=false&downstream=false&state=failed&task_id=pipeline-question.question_created_timetable.standardized_submit_spark_job
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/sync.py", 
line 134, in handle
       self.handle_request(listener, req, client, addr)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/sync.py", 
line 177, in handle_request
       respiter = self.wsgi(environ, resp.start_response)
     File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", 
line 2552, in __call__
       return self.wsgi_app(environ, start_response)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/werkzeug/middleware/proxy_fix.py",
 line 187, in __call__
       return self.app(environ, start_response)
     File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", 
line 2529, in wsgi_app
       response = self.full_dispatch_request()
     File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", 
line 1823, in full_dispatch_request
       rv = self.dispatch_request()
     File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", 
line 1799, in dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/auth.py", line 
250, in decorated
       return _has_access(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/auth.py", line 
163, in _has_access
       return func(*args, **kwargs)
   
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/decorators.py", 
line 159, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/views.py", line 
2756, in confirm
       to_be_altered = set_state(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py",
 line 141, in set_state
       dag_run_ids = get_run_ids(dag, run_id, future, past, session=session)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 94, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py",
 line 353, in get_run_ids
       dates = [
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py",
 line 353, in <listcomp>
       dates = [
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py", line 
1293, in iter_dagrun_infos_between
       info = self.timetable.next_dagrun_info(
     File "/opt/airflow/plugins/custom_timetable/main.py", line 74, in 
next_dagrun_info
       next_end = croniter(self._cron_expression, 
next_start).get_next(pendulum.DateTime)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 
292, in get_next
       return self._get_next(ret_type or self._ret_type,
     File 
"/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 
396, in _get_next
       result = self._calc(self.cur, expanded,
     File 
"/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 
490, in _calc
       dst = now = self._timestamp_to_datetime(now + sign * offset)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 
336, in _timestamp_to_datetime
       result = datetime.datetime.fromtimestamp(timestamp, 
tz=tzutc()).replace(tzinfo=None)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/base.py", 
line 204, in handle_abort
       sys.exit(1)
   SystemExit: 1
   ```
   
   we can still update the task status from task lists UI.
   
   ### What you think should happen instead?
   
   DAG run should be able to update the tasks status without crashing pods
   
   ### How to reproduce
   
   Custom Timetable implementation:
   
   `import pendulum
   import logging
   from pendulum import datetime, duration, date, Time
   from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable, 
DataInterval
   from airflow.plugins_manager import AirflowPlugin
   from croniter import croniter
   from typing import Optional, Dict, Any
   
   log = logging.getLogger(__name__)
   
   class CustomTimetableSchedule(Timetable):
       """
       Custom timetable that supports both cron-based scheduling and chunked 
data processing.
       """
       
       def __init__(self, cron_expression: Optional[str] = None, latest_date: 
Optional[date] = None,
                     range: Optional[str] = None, max_active_runs: 
Optional[int] = None):
           """
           Initialize the timetable with an optional cron expression.
           
           Args:
               cron_expression: A cron expression for scheduling (e.g., "0 0 * 
* *" for daily at midnight)
               latest_date: The latest date to process data up to
           """
           self._cron_expression = cron_expression
           self._latest_date = latest_date
           self._range = range or 365
   
       
       def get_chunk_size(self, diff_days: int, range_allowed: int) -> int:
           if range_allowed == 365 and diff_days >= 365:
               return 365
           if diff_days >= 180:
               return 180
           if diff_days >= 30:
               return 30
           return 1
       
       def infer_manual_data_interval(self, run_after: datetime) -> 
DataInterval:
           delta = pendulum.duration(days=1)
           cron = croniter(self._cron_expression, run_after-delta)
           start = cron.get_prev(pendulum.DateTime)
           end = cron.get_next(pendulum.DateTime)
           return DataInterval(start=start, end=end)
   
       def next_dagrun_info(self, last_automated_data_interval: None, 
restriction: TimeRestriction) -> DagRunInfo:
           """
           Override next_dagrun_info to handle both cron-based scheduling and 
chunked data processing.
           
           Args:
               last_automated_data_interval: The last automated DAG run interval
               restriction: Time restriction for the DAG run
               
           Returns:
               DagRunInfo: Information about the next DAG run
           """
   
           now = pendulum.now()
           latest_datetime = pendulum.datetime(
               self._latest_date.year, 
               self._latest_date.month, 
               self._latest_date.day, 
               0, 0, 0, tz="UTC"
           )
           if last_automated_data_interval:
               log.info(f"[CustomTimetable] last_automated_data_interval: 
{last_automated_data_interval}")
               next_start = last_automated_data_interval.end
           else:
               log.info(f"[CustomTimetable] restriction.earliest: 
{restriction.earliest}")
               next_start = restriction.earliest
   
           if next_start >= latest_datetime:
               log.info("[CustomTimetable] Switching to cron-based scheduling")
               next_end = croniter(self._cron_expression, 
next_start).get_next(pendulum.DateTime)
               log.info(f"[CustomTimetable] cron-based next_start: 
{next_start}, next_end: {next_end}")
               return DagRunInfo.interval(start=next_start, end=next_end)
           
           diff_days = (latest_datetime - next_start).days
           log.info(f"[CustomTimetable] diff_days B: {diff_days}")
   
           chunk_size = self.get_chunk_size(diff_days, 
range_allowed=self._range)
   
           next_end = next_start.add(days=chunk_size)
           log.info(f"[CustomTimetable] Backfill chunk: {next_start} to 
{next_end} ({chunk_size} days)")
   
           return DagRunInfo.interval(start=next_start, end=next_end)
   
   
       @property
       def summary(self) -> str:
           return f"After latest_date: {self._latest_date} schedule is 
{self._cron_expression}"
      
   Use above to run a sample DAG and try to change the running task status 
before they succeed
   
       def serialize(self) -> Dict[str, Any]:
           return {"cron_expression": self._cron_expression,
                   "latest_date": self._latest_date.isoformat(),
                   "range": self._range}
   
   
       @classmethod
       def deserialize(cls, data: dict[str, Any]) -> Timetable:
           latest_date = pendulum.parse(data["latest_date"]).date()
           log.info(f"[CustomTimetable] latest_date_deserialised: 
{latest_date}")
           return cls(
               cron_expression=data["cron_expression"],
               latest_date=latest_date,
               range=data["range"]
           )
   
   class CustomTimetableSchedulePlugin(AirflowPlugin):
       """
       Airflow plugin that registers the custom timetable.
       """
       name = "timetable_plugin"
       timetables = [CustomTimetableSchedule]
   `
   
   ### Operating System
   
   Kubernates
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon = "^8.19.0"
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to