Ferdinanddb opened a new issue, #58560:
URL: https://github.com/apache/airflow/issues/58560

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I have two DAGs using the new [deadline 
alerts](https://airflow.apache.org/docs/apache-airflow/stable/howto/deadline-alerts.html)
 feature. I did a simple test before setting the deadline to these DAGs, and it 
worked fine.
   
   Here is an example of a DAG I have with deadline alert:
   ```python
   with DAG(
       dag_id=Path(__file__).parent.name,
       dag_display_name="🟡 " + ICEBERG_FULL_TABLE_REF,
       schedule="0 0 * * *",
       default_args=default_args,
       tags={
           "some",
           "tags",
       },
       max_active_runs=1,
       catchup=False,
       deadline=DeadlineAlert(
           reference=DeadlineReference.DAGRUN_QUEUED_AT,
           interval=timedelta(hours=10),
           callback=AsyncCallback(
               callback_callable=SlackWebhookNotifier,
               kwargs={
                   "slack_webhook_conn_id": "slack_default",
                   "text": "some text",
               },
           ),
       ),
       on_failure_callback=default_dag_failure_slack_webhook_notification,
   ) as dag:
       # some Airflow tasks
   ```
   
   The scheduler pod is crashing (`CrashLoopBackoff` state) with the following 
error:
   
   ```
   2025-11-21T08:59:50.012752Z [info     ] Exited execute loop            
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1094
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 7, in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1078, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1368, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1478, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 97, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
378, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, 
in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, 
in __get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 106, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1888, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 2004, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagrun.py", 
line 1231, in update_state
       Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: 
self.run_id})
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/deadline.py", 
line 182, in prune_deadlines
       if dagrun.end_date <= deadline.deadline_time:
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: '<=' not supported between instances of 'NoneType' and 
'datetime.datetime'
   stream closed: EOF for airflow/airflow-scheduler-67599768b9-7d9pj (scheduler)
   ```
   
   The only way I found so far to resolve the situation is to comment out the 
Deadline logic (so to not have it anymore) in the DAG.
   
   
   ### What you think should happen instead?
   
   The scheduler pod should not crash. This is a bit dangerous as one DAG can 
have an impact on all the other DAGs in the sense that one DAG is causing the 
scheduler to crash, preventing the other DAGs to be scheduled.
   
   ### How to reproduce
   
   I don't really know, but my DAG is basic:
   
   ```python
   from datetime import datetime, timedelta
   from pathlib import Path
   
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   
   from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
   from airflow.providers.standard.sensors.external_task import 
ExternalTaskSensor
   from airflow.sdk import DAG, Asset, TaskGroup
   
   from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
   from airflow_richfox.custom.utils.gcs import upload_spark_config_to_gcs
   from airflow_richfox.custom.utils.slack import (
       deadline_exceeded_slack_text_message,
       default_dag_failure_slack_webhook_notification,
   )
   
   ICEBERG_CATALOG_NAME = "biglakeCatalog"
   ICEBERG_TABLE_REF = "gold.some_table"
   ICEBERG_FULL_TABLE_REF = f"{ICEBERG_CATALOG_NAME}.{ICEBERG_TABLE_REF}"
   
   # Define the Asset for this DAG
   asset_table_gold_algoseek_sec_master_full_listed_current = 
Asset(f"x-iceberg://{ICEBERG_FULL_TABLE_REF}")
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 4,
       "start_date": datetime(2024, 4, 29),
       "retry_delay": timedelta(minutes=4),
   }
   
   # Update your DAG to use parallel processing
   with DAG(
       dag_id=Path(__file__).parent.name,
       dag_display_name="🟡 " + ICEBERG_FULL_TABLE_REF,
       schedule="0 0 * * *",
       default_args=default_args,
       tags={
           "some",
           "gold",
           "tag",
       },
       max_active_runs=1,
       catchup=False,
       deadline=DeadlineAlert(
           reference=DeadlineReference.DAGRUN_QUEUED_AT,
           interval=timedelta(hours=10),
           callback=AsyncCallback(
               callback_callable=SlackWebhookNotifier,
               kwargs={
                   "slack_webhook_conn_id": "slack_default",
                   "text": deadline_exceeded_slack_text_message,
               },
           ),
       ),
       on_failure_callback=default_dag_failure_slack_webhook_notification,
   ) as dag:
       with TaskGroup("wait_for_source_silver_dags") as 
wait_for_source_silver_dags:
           for source_dagname in [
               "silver_dag1",
               "silver_dag2",
               "silver_dag3",
               "silver_dag4",
               "silver_dag5",
               "silver_dag6",
           ]:
               wait_for_dag = ExternalTaskSensor(
                   task_id=f"wait_for_dag_{source_dagname}",
                   external_dag_id=source_dagname,
                   timeout=60 * 60 * 12,  # 12 hours
                   poll_interval=60 * 3.0,  # 3 minutes
                   deferrable=True,
                   soft_fail=False,
                   execution_date_fn=lambda dt: dt,
               )
   
       full_reload_job = SparkKubernetesOperator(
           task_id="full_reload_job",
           namespace="spark-operator",
           
application_file="spark_app/full_reload_job/spark_application_config.yml",
           kubernetes_conn_id="kubernetes_default",
           random_name_suffix=True,
           get_logs=True,
           reattach_on_restart=True,
           delete_on_termination=False,
           do_xcom_push=False,
           deferrable=True,
           base_container_status_polling_interval=60,
           on_execute_callback=upload_spark_config_to_gcs,
           outlets=[asset_table_gold_algoseek_sec_master_full_listed_current],
       )
   
       maintenance_job = SparkKubernetesOperator(
           task_id="maintenance_job",
           namespace="spark-operator",
           
application_file="spark_app/maintenance_job/spark_application_config.yml",
           kubernetes_conn_id="kubernetes_default",
           random_name_suffix=True,
           get_logs=True,
           reattach_on_restart=True,
           delete_on_termination=False,
           do_xcom_push=False,
           deferrable=True,
           on_execute_callback=upload_spark_config_to_gcs,
       )
   
       wait_for_source_silver_dags >> full_reload_job >> maintenance_job
   
   ```
   
   ### Operating System
   
   Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   I use the official constraint file to install dependencies.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to