stephen-bracken opened a new issue, #60408:
URL: https://github.com/apache/airflow/issues/60408

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.3
   
   ### What happened?
   
   There was an error deserializing (what I assume to be) the `start_date` 
parameter for one of my dags.
   
   Stacktrace:
   ```
   airflow dags reserialize
   
   
/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py:52
 DeprecatedImportWarning: The `airflow.utils.timezone.utcnow` attribute is 
deprecated. Please use `'airflow.sdk.timezone.utcnow'`.
   
   2026-01-12T13:37:29.477548Z [warning  ] Skipping masking for a secret as 
it's too short (<5 chars) [airflow._shared.secrets_masker.secrets_masker] 
loc=secrets_masker.py:555
   
   2026-01-12T13:37:29.513573Z [info     ] Registered prom-export         
[airflow_exporter.prometheus_exporter] loc=prometheus_exporter.py:348
   
   2026-01-12T13:37:30.033628Z [info     ] Sync 3 DAGs                    
[airflow.serialization.serialized_objects] loc=serialized_objects.py:2893
   
   2026-01-12T13:37:30.040495Z [info     ] Setting next_dagrun for 
bare_minimum_dag to None, run_after=None [airflow.models.dag] loc=dag.py:688
   
   2026-01-12T13:37:30.040972Z [info     ] Setting next_dagrun for 
dynamic_task_dag to None, run_after=None [airflow.models.dag] loc=dag.py:688
   
   2026-01-12T13:37:30.041348Z [error    ] Failed to deserialize DAG      
[airflow.serialization.serialized_objects] loc=serialized_objects.py:3938
   
   Traceback (most recent call last):
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2555, in deserialize_dag
   
       return cls._deserialize_dag_internal(encoded_dag, client_defaults)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2581, in _deserialize_dag_internal
   
       deser = SerializedBaseOperator.deserialize_operator(
   
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 1777, in deserialize_operator
   
       cls.populate_operator(op, encoded_op, client_defaults)
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 1619, in populate_operator
   
       v = cls._deserialize_partial_kwargs(v, client_defaults)
   
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2149, in _deserialize_partial_kwargs
   
       deserialized[k] = cls._deserialize_field_value(k, v)
   
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2117, in _deserialize_field_value
   
       return cls._deserialize_datetime(value) if value is not None else None
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/_shared/timezones/timezone.py",
 line 302, in from_timestamp
   
       result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
   
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   TypeError: 'str' object cannot be interpreted as an integer
   
   
   The above exception was the direct cause of the following exception:
   
   
   Traceback (most recent call last):
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 3936, in _real_dag
   
       return SerializedDAG.from_dict(self.data)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2870, in from_dict
   
       return cls.deserialize_dag(serialized_obj["dag"], client_defaults)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2561, in deserialize_dag
   
       raise DeserializationError(dag_id) from err
   
   airflow.exceptions.DeserializationError: An unexpected error occurred while 
trying to deserialize Dag 'run_test_dags'
   
   Traceback (most recent call last):
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2555, in deserialize_dag
   
       return cls._deserialize_dag_internal(encoded_dag, client_defaults)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2581, in _deserialize_dag_internal
   
       deser = SerializedBaseOperator.deserialize_operator(
   
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 1777, in deserialize_operator
   
       cls.populate_operator(op, encoded_op, client_defaults)
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 1619, in populate_operator
   
       v = cls._deserialize_partial_kwargs(v, client_defaults)
   
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2149, in _deserialize_partial_kwargs
   
       deserialized[k] = cls._deserialize_field_value(k, v)
   
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2117, in _deserialize_field_value
   
       return cls._deserialize_datetime(value) if value is not None else None
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/_shared/timezones/timezone.py",
 line 302, in from_timestamp
   
       result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
   
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   TypeError: 'str' object cannot be interpreted as an integer
   
   
   The above exception was the direct cause of the following exception:
   
   
   Traceback (most recent call last):
   
     File "/home/airflow/.local/bin/airflow", line 7, in <module>
   
       sys.exit(main())
   
                ^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 
55, in main
   
       args.func(args)
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", 
line 49, in command
   
       return func(*args, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 
114, in wrapper
   
       return f(*args, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
   
       return func(*args, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   
       return func(*args, session=session, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_command.py",
 line 691, in dag_reserialize
   
       sync_bag_to_db(dag_bag, bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 98, in wrapper
   
       return func(*args, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", 
line 665, in sync_bag_to_db
   
       update_dag_parsing_results_in_db(
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 394, in update_dag_parsing_results_in_db
   
       for attempt in run_with_db_retries(logger=log):
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
445, in __iter__
   
       do = self.iter(retry_state=retry_state)
   
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
378, in iter
   
       result = action(retry_state)
   
                ^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
400, in <lambda>
   
       self._add_action_func(lambda rs: rs.outcome.result())
   
                                        ^^^^^^^^^^^^^^^^^^^
   
     File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449, 
in result
   
       return self.__get_result()
   
              ^^^^^^^^^^^^^^^^^^^
   
     File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401, 
in __get_result
   
       raise self._exception
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 404, in update_dag_parsing_results_in_db
   
       SerializedDAG.bulk_write_to_db(
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 98, in wrapper
   
       return func(*args, **kwargs)
   
              ^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2901, in bulk_write_to_db
   
       dag_op.update_dags(orm_dags, parse_duration, session=session)
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 556, in update_dags
   
       dm.calculate_dagrun_date_fields(dag, last_automated_data_interval)  # 
type: ignore[arg-type]
   
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 
680, in calculate_dagrun_date_fields
   
       next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
   
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 3927, in next_dagrun_info
   
       return self._real_dag.next_dagrun_info(*args, **kwargs)
   
              ^^^^^^^^^^^^^^
   
     File "/usr/python/lib/python3.11/functools.py", line 1001, in __get__
   
       val = self.func(instance)
   
             ^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 3936, in _real_dag
   
       return SerializedDAG.from_dict(self.data)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2870, in from_dict
   
       return cls.deserialize_dag(serialized_obj["dag"], client_defaults)
   
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2561, in deserialize_dag
   
       raise DeserializationError(dag_id) from err
   
   airflow.exceptions.DeserializationError: An unexpected error occurred while 
trying to deserialize Dag 'run_test_dags'
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   dag code:
   ```
   import os
   import datetime as dt
   from airflow.settings import DAGS_FOLDER
   from airflow.sdk import dag, task
   from airflow.providers.standard.sensors.external_task import 
ExternalTaskSensor
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
   from airflow.models import Variable, DagBag
   from airflow.models.dagbag import sync_bag_to_db
   from airflow.utils.state import DagRunState
   import pendulum
   
   dag_pool = Variable.get("metadag_pool", "default")
   
   
   @dag(
       start_date=pendulum.datetime(2024, 5, 21,tz="UTC"),
       
dagrun_timeout=dt.timedelta(seconds=float(Variable.get("TEST_DAG_TIMEOUT", 
"60"))),
   )
   def run_test_dags():
       """Metadag that collects and runs test dags"""
   
       @task.python(pool=dag_pool)
       def get_dag_ids(folder):
           """
           Get dag_ids for dags in the given subfolder
           """
           if os.path.dirname(__file__) == folder:
               raise ValueError(
                   "Cannot collect DAGs from the same folder, please use a sub 
folder"
               )
   
           dag_bag = DagBag(folder, include_examples=False)
   
           sync_bag_to_db(dag_bag,"test_dags")
   
           return dag_bag.dag_ids
   
       dag_ids = get_dag_ids(os.path.join(DAGS_FOLDER, "test_dags"))
       start = BashOperator(
           task_id="start_tests", bash_command="echo starting tests", 
pool=dag_pool
       )
       # reserialize = BashOperator(
       #     task_id="reserialize_dags", bash_command="airflow dags reserialize"
       # )
       end = BashOperator(
           task_id="end_tests", bash_command="echo tests finished.", 
pool=dag_pool
       )
   
       # Ensure that the execution date for the child dags is the same as the 
current dagrun to make listening easier
       dagruns = TriggerDagRunOperator.partial(
           task_id="create_dagruns", logical_date="{{ ts_nodash_with_tz }}", 
pool=dag_pool
       ).expand(trigger_dag_id=dag_ids)
   
       dag_status = ExternalTaskSensor.partial(
           task_id="check_dag_status",
           external_task_id=None,
           check_existence=True,
           allowed_states=[DagRunState.SUCCESS],
           failed_states=[DagRunState.FAILED],
           pool=dag_pool,
       ).expand(external_dag_id=dag_ids)
   
       start >> dag_ids >> dagruns >> dag_status >> end
   
   
   run_test_dags()
   ```
   
   * add a dag file with the test dag code to the dags folder 
   * create a fresh airflow database using `airflow db reset -y`
   * run `airflow dags reserialize`
   
   ### Operating System
   
   Airflow official docker image / Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   Providers info
   
   apache-airflow-providers-celery           | 3.13.0
   
   apache-airflow-providers-cncf-kubernetes  | 10.9.0
   
   apache-airflow-providers-common-compat    | 1.8.0 
   
   apache-airflow-providers-common-io        | 1.6.4 
   
   apache-airflow-providers-common-messaging | 2.0.0 
   
   apache-airflow-providers-common-sql       | 1.28.2
   
   apache-airflow-providers-fab              | 3.0.1 
   
   apache-airflow-providers-git              | 0.0.9 
   
   apache-airflow-providers-grpc             | 3.8.2 
   
   apache-airflow-providers-odbc             | 4.10.2
   
   apache-airflow-providers-postgres         | 6.4.0 
   
   apache-airflow-providers-standard         | 1.9.1 
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   this is reproducible when testing a custom docker image using the airflow 
base image:
   `apache/airflow:3.1.3-python3.11`
   using google container-structure-test to run a test script
   
   ### Anything else?
   
   Python version 3.11
   
   Using `LocalExecutor` and postgres v15 database backend inside the docker 
container
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to