tirkarthi opened a new issue, #56707:
URL: https://github.com/apache/airflow/issues/56707

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   As part of Airflow 3 migration using offline sql we resorted to setting conf 
as null since it was not required. We noticed that clearing the dagruns 
migrated with conf as null causes the scheduler to crash since `None` is not a 
valid type for `conf`. The column is nullable and other datamodels accept 
`None`. 
   
   
https://github.com/apache/airflow/blob/c28b21178b9c8c299a8ec5c74eff39d7e1da99b9/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py#L81
   
   None not accepted
   
   
https://github.com/apache/airflow/blob/c28b21178b9c8c299a8ec5c74eff39d7e1da99b9/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L301
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Trigger an example dag like `example_callback` .
   2. Set the conf as null in db since the column is nullable .
   
   ```
   sqlite> update dag_run set conf=NULL where dag_id = "example_callback";
   sqlite> select * from dag_run where dag_id = "example_callback";
                         id = 8
                     dag_id = example_callback
                  queued_at = 2025-10-16 08:02:14.770940
               logical_date = 2025-10-16 08:02:13.000000
                 start_date = 2025-10-16 08:02:15.161386
                   end_date = 2025-10-16 08:02:18.564532
                      state = success
                     run_id = manual__2025-10-16T08:02:13+00:00
            creating_job_id = 
                   run_type = manual
               triggered_by = UI
       triggering_user_name = admin
                       conf = 
        data_interval_start = 2025-10-16 08:02:13.000000
          data_interval_end = 2025-10-16 08:02:13.000000
                  run_after = 2025-10-16 08:02:13.000000
   last_scheduling_decision = 2025-10-16 08:02:18.561214
            log_template_id = 1
                 updated_at = 2025-10-16 08:02:18.565788
               clear_number = 0
                backfill_id = 
             bundle_version = 
        scheduled_by_job_id = 15
            context_carrier = {"__var": {}, "__type": "dict"}
                span_status = ended
     created_dag_version_id = 01999970192a77c383c76f3b7ac653f1
   ```
   
   ```
   [2025-10-16T08:04:05.714515Z] {local_executor.py:226} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   [2025-10-16T08:04:05.737568Z] {scheduler_job_runner.py:1068} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py", 
line 55, in main
       args.func(args)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py",
 line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py", 
line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1052, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1342, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1452, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/retries.py",
 line 97, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 445, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 378, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/retries.py",
 line 106, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1860, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                         
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1860, in <listcomp>
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1976, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/dagrun.py",
 line 1197, in update_state
       context_from_server=DagRunContext(
                           ^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/pydantic/main.py",
 line 253, in __init__
       validated_self = self.__pydantic_validator__.validate_python(data, 
self_instance=self)
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
DagRunContext
   dag_run.conf
     Input should be a valid dictionary [type=dict_type, input_value=None, 
input_type=NoneType]
       For further information visit 
https://errors.pydantic.dev/2.11/v/dict_type
   INFO:     Shutting down
   INFO:     Waiting for application shutdown.
   INFO:     Application shutdown complete.
   INFO:     Finished server process [10591]
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to