AanchalAgrawal2105 opened a new issue, #49216: URL: https://github.com/apache/airflow/issues/49216
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.10.3 ### What happened? We are using Custom Timetable to catchup DAG from start date in a chunk of yearly, 6 months/ monthly and so one till a cut-off date then using cron to continue daily runs from cut-off date. When we try to manually mark a DAG run from UI as failed/success , running task is still in green with no updates. When we try to mark the task directly as success/failed below occurs: <img width="851" alt="Image" src="https://github.com/user-attachments/assets/d7363d0f-e363-4e58-9bf5-91e12d215d76" /> Webserver pod restarts with error like ``` [2025-04-14T10:35:21.297+0000] {main.py:73} INFO - [CustomTimetable] Switching to cron-based scheduling [2025-04-14T10:35:21.297+0000] {main.py:73} INFO - [CustomTimetable] Switching to cron-based scheduling [2025-04-14T10:35:21.298+0000] {main.py:75} INFO - [CustomTimetable] cron-based next_start: 2261-11-01 05:00:00+00:00, next_end: 2261-11-02 05:00:00+00:00 [2025-04-14T10:35:21.298+0000] {main.py:66} INFO - [CustomTimetable] last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 1, 5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 2, 5, 0, tzinfo=Timezone('UTC'))) [2025-04-14T10:35:21.298+0000] {main.py:73} INFO - [CustomTimetable] Switching to cron-based scheduling [2025-04-14T10:35:21.298+0000] {main.py:75} INFO - [CustomTimetable] cron-based next_start: 2261-11-02 05:00:00+00:00, next_end: 2261-11-03 05:00:00+00:00 [2025-04-14T10:35:21.299+0000] {main.py:66} INFO - [CustomTimetable] last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 2, 5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 3, 5, 0, tzinfo=Timezone('UTC'))) [2025-04-14T10:35:21.299+0000] {main.py:73} INFO - [CustomTimetable] Switching to cron-based scheduling [2025-04-14T10:35:21.299+0000] {main.py:75} INFO - [CustomTimetable] cron-based next_start: 2261-11-03 05:00:00+00:00, next_end: 2261-11-04 05:00:00+00:00 [2025-04-14T10:35:21.299+0000] {main.py:66} INFO - [CustomTimetable] last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 3, 5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 4, 5, 0, tzinfo=Timezone('UTC'))) [2025-04-14T10:35:21.299+0000] {main.py:73} INFO - [CustomTimetable] Switching to cron-based scheduling [2025-04-14T10:35:21.300+0000] {main.py:75} INFO - [CustomTimetable] cron-based next_start: 2261-11-04 05:00:00+00:00, next_end: 2261-11-05 05:00:00+00:00 [2025-04-14T10:35:21.300+0000] {main.py:66} INFO - [CustomTimetable] last_automated_data_interval: DataInterval(start=datetime.datetime(2261, 11, 4, 5, 0, tzinfo=Timezone('UTC')), end=datetime.datetime(2261, 11, 5, 5, 0, tzinfo=Timezone('UTC'))) [2025-04-14 10:31:44 +0000] [48] [ERROR] Error handling request /confirm?dag_id=slack-commander-standardized.pipeline-question.question_created_timetable.standardized&dag_run_id=scheduled__2025-03-17T00%3A00%3A00%2B00%3A00&past=false&future=false&upstream=false&downstream=false&state=failed&task_id=pipeline-question.question_created_timetable.standardized_submit_spark_job Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/sync.py", line 134, in handle self.handle_request(listener, req, client, addr) File "/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/sync.py", line 177, in handle_request respiter = self.wsgi(environ, resp.start_response) File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", line 2552, in __call__ return self.wsgi_app(environ, start_response) File "/home/airflow/.local/lib/python3.10/site-packages/werkzeug/middleware/proxy_fix.py", line 187, in __call__ return self.app(environ, start_response) File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", line 2529, in wsgi_app response = self.full_dispatch_request() File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request rv = self.dispatch_request() File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/www/auth.py", line 250, in decorated return _has_access( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/www/auth.py", line 163, in _has_access return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/www/decorators.py", line 159, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/www/views.py", line 2756, in confirm to_be_altered = set_state( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py", line 141, in set_state dag_run_ids = get_run_ids(dag, run_id, future, past, session=session) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 94, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py", line 353, in get_run_ids dates = [ File "/home/airflow/.local/lib/python3.10/site-packages/airflow/api/common/mark_tasks.py", line 353, in <listcomp> dates = [ File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py", line 1293, in iter_dagrun_infos_between info = self.timetable.next_dagrun_info( File "/opt/airflow/plugins/custom_timetable/main.py", line 74, in next_dagrun_info next_end = croniter(self._cron_expression, next_start).get_next(pendulum.DateTime) File "/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 292, in get_next return self._get_next(ret_type or self._ret_type, File "/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 396, in _get_next result = self._calc(self.cur, expanded, File "/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 490, in _calc dst = now = self._timestamp_to_datetime(now + sign * offset) File "/home/airflow/.local/lib/python3.10/site-packages/croniter/croniter.py", line 336, in _timestamp_to_datetime result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) File "/home/airflow/.local/lib/python3.10/site-packages/gunicorn/workers/base.py", line 204, in handle_abort sys.exit(1) SystemExit: 1 ``` we can still update the task status from task lists UI. ### What you think should happen instead? DAG run should be able to update the tasks status without crashing pods ### How to reproduce Custom Timetable implementation: `import pendulum import logging from pendulum import datetime, duration, date, Time from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable, DataInterval from airflow.plugins_manager import AirflowPlugin from croniter import croniter from typing import Optional, Dict, Any log = logging.getLogger(__name__) class CustomTimetableSchedule(Timetable): """ Custom timetable that supports both cron-based scheduling and chunked data processing. """ def __init__(self, cron_expression: Optional[str] = None, latest_date: Optional[date] = None, range: Optional[str] = None, max_active_runs: Optional[int] = None): """ Initialize the timetable with an optional cron expression. Args: cron_expression: A cron expression for scheduling (e.g., "0 0 * * *" for daily at midnight) latest_date: The latest date to process data up to """ self._cron_expression = cron_expression self._latest_date = latest_date self._range = range or 365 def get_chunk_size(self, diff_days: int, range_allowed: int) -> int: if range_allowed == 365 and diff_days >= 365: return 365 if diff_days >= 180: return 180 if diff_days >= 30: return 30 return 1 def infer_manual_data_interval(self, run_after: datetime) -> DataInterval: delta = pendulum.duration(days=1) cron = croniter(self._cron_expression, run_after-delta) start = cron.get_prev(pendulum.DateTime) end = cron.get_next(pendulum.DateTime) return DataInterval(start=start, end=end) def next_dagrun_info(self, last_automated_data_interval: None, restriction: TimeRestriction) -> DagRunInfo: """ Override next_dagrun_info to handle both cron-based scheduling and chunked data processing. Args: last_automated_data_interval: The last automated DAG run interval restriction: Time restriction for the DAG run Returns: DagRunInfo: Information about the next DAG run """ now = pendulum.now() latest_datetime = pendulum.datetime( self._latest_date.year, self._latest_date.month, self._latest_date.day, 0, 0, 0, tz="UTC" ) if last_automated_data_interval: log.info(f"[CustomTimetable] last_automated_data_interval: {last_automated_data_interval}") next_start = last_automated_data_interval.end else: log.info(f"[CustomTimetable] restriction.earliest: {restriction.earliest}") next_start = restriction.earliest if next_start >= latest_datetime: log.info("[CustomTimetable] Switching to cron-based scheduling") next_end = croniter(self._cron_expression, next_start).get_next(pendulum.DateTime) log.info(f"[CustomTimetable] cron-based next_start: {next_start}, next_end: {next_end}") return DagRunInfo.interval(start=next_start, end=next_end) diff_days = (latest_datetime - next_start).days log.info(f"[CustomTimetable] diff_days B: {diff_days}") chunk_size = self.get_chunk_size(diff_days, range_allowed=self._range) next_end = next_start.add(days=chunk_size) log.info(f"[CustomTimetable] Backfill chunk: {next_start} to {next_end} ({chunk_size} days)") return DagRunInfo.interval(start=next_start, end=next_end) @property def summary(self) -> str: return f"After latest_date: {self._latest_date} schedule is {self._cron_expression}" Use above to run a sample DAG and try to change the running task status before they succeed def serialize(self) -> Dict[str, Any]: return {"cron_expression": self._cron_expression, "latest_date": self._latest_date.isoformat(), "range": self._range} @classmethod def deserialize(cls, data: dict[str, Any]) -> Timetable: latest_date = pendulum.parse(data["latest_date"]).date() log.info(f"[CustomTimetable] latest_date_deserialised: {latest_date}") return cls( cron_expression=data["cron_expression"], latest_date=latest_date, range=data["range"] ) class CustomTimetableSchedulePlugin(AirflowPlugin): """ Airflow plugin that registers the custom timetable. """ name = "timetable_plugin" timetables = [CustomTimetableSchedule] ` ### Operating System Kubernates ### Versions of Apache Airflow Providers apache-airflow-providers-amazon = "^8.19.0" ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
