alvinzhangdd commented on PR #54775:
URL: https://github.com/apache/airflow/pull/54775#issuecomment-3221948832
@abhishekbhakat
Please check the google doc I shared with you in the channel. I did a local
testing with
* Patch 1
* Only add try block to scheduler_job_runner.
* Patch 2
* Add try block to scheduler_job_runner.
* Skip deserializing `_date` fields which are not int/float.
I used a malformed dag with task level param start_date, end_date using
jinjia template.
* Patch 1
* When unpause dag, scheduler will not crash, just cannot create dag run
for the dag.
* Scheduler shows error log on create dag run
* Patch 2
* When unpause dag, scheduler will crash. The error is task.end_date is
None
* Reason is that the skip deserialization made end_date = None.
So if skipping the deserialization on _date, it will crash the scheduler as
well.
```
scheduler-1 | [2025-08-22T21:21:53.314+0000]
{scheduler_job_runner.py:1196} ERROR - Failed creating DagRun for foo_bar_dag
scheduler-1 | Traceback (most recent call last):
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1180, in _create_dag_runs
scheduler-1 | dag.create_dagrun(
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in
wrapper
scheduler-1 | return func(*args, **kwargs)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dag.py", line 3039, in
create_dagrun
scheduler-1 | run.verify_integrity(session=session)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in
wrapper
scheduler-1 | return func(*args, **kwargs)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dagrun.py", line 1206,
in verify_integrity
scheduler-1 | self._create_task_instances(self.dag_id,
tis_to_create, created_counts, hook_is_noop, session=session)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dagrun.py", line 1394,
in _create_task_instances
scheduler-1 | session.bulk_insert_mappings(TI, tasks)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3810,
in bulk_insert_mappings
scheduler-1 | self._bulk_save_mappings(
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3913,
in _bulk_save_mappings
scheduler-1 | transaction.rollback(_capture_exception=True)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line
70, in __exit__
scheduler-1 | compat.raise_(
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211,
in raise_
scheduler-1 | raise exception
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3901,
in _bulk_save_mappings
scheduler-1 | persistence._bulk_insert(
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
72, in _bulk_insert
scheduler-1 | mappings = list(mappings)
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dagrun.py", line 1355,
in _create_tasks
scheduler-1 | for task in tasks:
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dagrun.py", line 1204,
in <genexpr>
scheduler-1 | tasks_to_create = (task for task in
dag.task_dict.values() if task_filter(task))
scheduler-1 | File
"/usr/local/lib/python3.8/site-packages/airflow/models/dagrun.py", line 1197,
in task_filter
scheduler-1 | and (task.end_date is None or self.execution_date <=
task.end_date)
scheduler-1 | TypeError: '<=' not supported between instances of
'DateTime' and 'str'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]