tirkarthi opened a new issue, #47808:
URL: https://github.com/apache/airflow/issues/47808

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Following stack trace is seen on trying to execute the attached dag.
   
   ```
   [2025-03-15 15:30:21 +0530] [27911] [INFO] Worker exiting (pid: 27911)
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
342, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
371, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 937, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1063, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1163, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", 
line 93, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 443, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 376, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 398, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", 
line 102, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1569, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                         
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1569, in <listcomp>
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1667, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", 
line 943, in update_state
       info = self.task_instance_scheduling_decisions(session)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", 
line 1123, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
                                                          ^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", 
line 1222, in _get_ready_tis
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/models/taskinstance.py", line 
2335, in are_dependencies_met
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/models/taskinstance.py", line 
2359, in get_failed_dep_statuses
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/ti_deps/deps/base_ti_dep.py", 
line 116, in get_dep_statuses
       yield from self._get_dep_statuses(ti, session, cxt)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/ti_deps/deps/not_previously_skipped_dep.py",
 line 56, in _get_dep_statuses
       if parent.inherits_from_skipmixin:
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: 'SerializedBaseOperator' object has no attribute 
'inherits_from_skipmixin'
   [2025-03-15 15:30:21 +0530] [27910] [INFO] Shutting down: Master
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Following dag file crashes the scheduler 
   
   ```python
   DOC = """# Docs for the dag
   
   * 1
   * 2
   
   *bold* _italics_
   """
   
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.decorators import task
   
   from datetime import timedelta
   
   
   with DAG(
       dag_id="retry_issue_dag",
       start_date=datetime(2023, 10, 10),
       catchup=False,
       schedule="@once",
       doc_md=DOC,
   ) as dag:
   
       @task(retries=8, retry_delay=timedelta(seconds=1))
       def retry_less_than_10():
           raise Exception("fail")
   
       @task(retries=40, retry_delay=timedelta(seconds=1))
       def retry_more_than_10():
           raise Exception("fail")
   
       retry_less_than_10()
       retry_more_than_10()
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to