nathadfield opened a new issue, #59522:
URL: https://github.com/apache/airflow/issues/59522
### Apache Airflow version
3.1.5
### If "Other Airflow 3 version" selected, which one?
_No response_
### What happened?
On Airflow 3.1.x, a DAG using dynamic task mapping over a custom operator
that has templated `from_date`/`to_date` in `partial_kwargs` parses, but the
scheduler fails to deserialize the serialized DAG.
**This is a regression from v3.0.x**
### What you think should happen instead?
The scheduler should successfully deserialize the DAG. Templated strings in
operator kwargs should remain strings and not be coerced to timestamps during
deserialization.
Here's the full traceback.
```
Traceback (most recent call last):
2025-12-16T16:47:02.151920509Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2552, in deserialize_dag
2025-12-16T16:47:02.152207759Z return
cls._deserialize_dag_internal(encoded_dag, client_defaults)
2025-12-16T16:47:02.152214134Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.152215759Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2578, in _deserialize_dag_internal
2025-12-16T16:47:02.152479051Z deser =
SerializedBaseOperator.deserialize_operator(
2025-12-16T16:47:02.152481926Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.152482759Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 1774, in deserialize_operator
2025-12-16T16:47:02.152641176Z cls.populate_operator(op, encoded_op,
client_defaults)
2025-12-16T16:47:02.152643842Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 1616, in populate_operator
2025-12-16T16:47:02.152823384Z v = cls._deserialize_partial_kwargs(v,
client_defaults)
2025-12-16T16:47:02.152827592Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.152828509Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2146, in _deserialize_partial_kwargs
2025-12-16T16:47:02.153025467Z deserialized[k] =
cls._deserialize_field_value(k, v)
2025-12-16T16:47:02.153028551Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153029551Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2114, in _deserialize_field_value
2025-12-16T16:47:02.153243551Z return cls._deserialize_datetime(value)
if value is not None else None
2025-12-16T16:47:02.153246634Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153247342Z File
"/usr/local/lib/python3.12/site-packages/airflow/_shared/timezones/timezone.py",
line 302, in from_timestamp
2025-12-16T16:47:02.153305801Z result =
coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
2025-12-16T16:47:02.153309592Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153316634Z TypeError: 'str' object cannot be interpreted
as an integer
2025-12-16T16:47:02.153317676Z
2025-12-16T16:47:02.153318342Z The above exception was the direct cause of
the following exception:
2025-12-16T16:47:02.153319134Z
2025-12-16T16:47:02.153319842Z Traceback (most recent call last):
2025-12-16T16:47:02.153320592Z File "/usr/local/bin/airflow", line 10, in
<module>
2025-12-16T16:47:02.153370801Z sys.exit(main())
2025-12-16T16:47:02.153372134Z ^^^^^^
2025-12-16T16:47:02.153372842Z File
"/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
2025-12-16T16:47:02.153460551Z args.func(args)
2025-12-16T16:47:02.153462759Z File
"/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49,
in command
2025-12-16T16:47:02.153505509Z return func(*args, **kwargs)
2025-12-16T16:47:02.153506967Z ^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153508009Z File
"/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in
wrapper
2025-12-16T16:47:02.153586717Z return f(*args, **kwargs)
2025-12-16T16:47:02.153588842Z ^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153589801Z File
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
2025-12-16T16:47:02.153613259Z return func(*args, **kwargs)
2025-12-16T16:47:02.153614384Z ^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153615176Z File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/dag_processor_command.py",
line 53, in dag_processor
2025-12-16T16:47:02.153650259Z run_command_with_daemon_option(
2025-12-16T16:47:02.153651634Z File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
2025-12-16T16:47:02.153711342Z callback()
2025-12-16T16:47:02.153715884Z File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/dag_processor_command.py",
line 56, in <lambda>
2025-12-16T16:47:02.153717342Z callback=lambda:
run_job(job=job_runner.job, execute_callable=job_runner._execute),
2025-12-16T16:47:02.153751801Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153753342Z File
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100,
in wrapper
2025-12-16T16:47:02.153827801Z return func(*args, session=session,
**kwargs)
2025-12-16T16:47:02.153834301Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153835467Z File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in
run_job
2025-12-16T16:47:02.153930467Z return execute_job(job,
execute_callable=execute_callable)
2025-12-16T16:47:02.153933384Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.153937342Z File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in
execute_job
2025-12-16T16:47:02.154038676Z ret = execute_callable()
2025-12-16T16:47:02.154040717Z ^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154041551Z File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/dag_processor_job_runner.py",
line 61, in _execute
2025-12-16T16:47:02.154042467Z self.processor.run()
2025-12-16T16:47:02.154043217Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 282, in run
2025-12-16T16:47:02.154105134Z return self._run_parsing_loop()
2025-12-16T16:47:02.154108551Z ^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154109551Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 371, in _run_parsing_loop
2025-12-16T16:47:02.154169884Z self._collect_results()
2025-12-16T16:47:02.154173301Z File
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100,
in wrapper
2025-12-16T16:47:02.154254967Z return func(*args, session=session,
**kwargs)
2025-12-16T16:47:02.154258342Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154259176Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 842, in _collect_results
2025-12-16T16:47:02.154370384Z self._file_stats[file] =
process_parse_results(
2025-12-16T16:47:02.154373009Z
^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154373967Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 1173, in process_parse_results
2025-12-16T16:47:02.154499259Z update_dag_parsing_results_in_db(
2025-12-16T16:47:02.154502509Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py",
line 394, in update_dag_parsing_results_in_db
2025-12-16T16:47:02.154555176Z for attempt in
run_with_db_retries(logger=log):
2025-12-16T16:47:02.154557467Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154558467Z File
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 445, in
__iter__
2025-12-16T16:47:02.154660634Z do = self.iter(retry_state=retry_state)
2025-12-16T16:47:02.154662759Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154663592Z File
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in
iter
2025-12-16T16:47:02.154728592Z result = action(retry_state)
2025-12-16T16:47:02.154730509Z ^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154731301Z File
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in
<lambda>
2025-12-16T16:47:02.154822551Z self._add_action_func(lambda rs:
rs.outcome.result())
2025-12-16T16:47:02.154826217Z
^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154830301Z File
"/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
2025-12-16T16:47:02.154891384Z return self.__get_result()
2025-12-16T16:47:02.154896384Z ^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.154897259Z File
"/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in
__get_result
2025-12-16T16:47:02.154926176Z raise self._exception
2025-12-16T16:47:02.154927301Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py",
line 404, in update_dag_parsing_results_in_db
2025-12-16T16:47:02.155010467Z SerializedDAG.bulk_write_to_db(
2025-12-16T16:47:02.155012301Z File
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in
wrapper
2025-12-16T16:47:02.155013176Z return func(*args, **kwargs)
2025-12-16T16:47:02.155063384Z ^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.155065592Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2898, in bulk_write_to_db
2025-12-16T16:47:02.155318801Z dag_op.update_dags(orm_dags,
parse_duration, session=session)
2025-12-16T16:47:02.155321134Z File
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py",
line 556, in update_dags
2025-12-16T16:47:02.155414676Z dm.calculate_dagrun_date_fields(dag,
last_automated_data_interval) # type: ignore[arg-type]
2025-12-16T16:47:02.155416676Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.155417551Z File
"/usr/local/lib/python3.12/site-packages/airflow/models/dag.py", line 680, in
calculate_dagrun_date_fields
2025-12-16T16:47:02.155499759Z next_dagrun_info =
dag.next_dagrun_info(last_automated_data_interval)
2025-12-16T16:47:02.155501967Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.155502967Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 3924, in next_dagrun_info
2025-12-16T16:47:02.155842884Z return
self._real_dag.next_dagrun_info(*args, **kwargs)
2025-12-16T16:47:02.155846342Z ^^^^^^^^^^^^^^
2025-12-16T16:47:02.155847301Z File
"/usr/local/lib/python3.12/functools.py", line 998, in __get__
2025-12-16T16:47:02.155977051Z val = self.func(instance)
2025-12-16T16:47:02.155982801Z ^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.155984134Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 3933, in _real_dag
2025-12-16T16:47:02.156307634Z return SerializedDAG.from_dict(self.data)
2025-12-16T16:47:02.156311509Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.156312509Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2867, in from_dict
2025-12-16T16:47:02.156555717Z return
cls.deserialize_dag(serialized_obj["dag"], client_defaults)
2025-12-16T16:47:02.156562509Z
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-12-16T16:47:02.156563676Z File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2558, in deserialize_dag
2025-12-16T16:47:02.156807759Z raise DeserializationError(dag_id) from
err
2025-12-16T16:47:02.156810217Z airflow.exceptions.DeserializationError: An
unexpected error occurred while trying to deserialize Dag
'dtm_from_date_deserialize_error'
```
### How to reproduce
Here's an example DAG that will highlight the issue.
```
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
class MyOp(BaseOperator):
template_fields = ("from_date", "to_date")
def __init__(self, from_date, to_date, marker=None, **kwargs):
super().__init__(**kwargs)
self.from_date = from_date
self.to_date = to_date
self.marker = marker # something to map over
def execute(self, context):
return {"from": self.from_date, "to": self.to_date, "marker":
self.marker}
with DAG(
dag_id="dtm_from_date_deserialize_error",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
MyOp.partial(
task_id="mapped",
from_date="{{ macros.ds_add(ds, -7) }}",
to_date="{{ macros.ds_add(ds, -1) }}",
).expand(marker=[1]) # map over a primitive to force partial_kwargs
serialization
```
### Operating System
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux"
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian
### Versions of Apache Airflow Providers
n/a
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]