nathadfield opened a new issue, #59522:
URL: https://github.com/apache/airflow/issues/59522

   ### Apache Airflow version
   
   3.1.5
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   On Airflow 3.1.x, a DAG using dynamic task mapping over a custom operator 
that has templated `from_date`/`to_date` in `partial_kwargs` parses, but the 
scheduler fails to deserialize the serialized DAG.
   
   **This is a regression from v3.0.x**
   
   ### What you think should happen instead?
   
   The scheduler should successfully deserialize the DAG. Templated strings in 
operator kwargs should remain strings and not be coerced to timestamps during 
deserialization.
   
   Here's the full traceback.
   ```
   Traceback (most recent call last):
   2025-12-16T16:47:02.151920509Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2552, in deserialize_dag
   2025-12-16T16:47:02.152207759Z     return 
cls._deserialize_dag_internal(encoded_dag, client_defaults)
   2025-12-16T16:47:02.152214134Z            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.152215759Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2578, in _deserialize_dag_internal
   2025-12-16T16:47:02.152479051Z     deser = 
SerializedBaseOperator.deserialize_operator(
   2025-12-16T16:47:02.152481926Z             
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.152482759Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 1774, in deserialize_operator
   2025-12-16T16:47:02.152641176Z     cls.populate_operator(op, encoded_op, 
client_defaults)
   2025-12-16T16:47:02.152643842Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 1616, in populate_operator
   2025-12-16T16:47:02.152823384Z     v = cls._deserialize_partial_kwargs(v, 
client_defaults)
   2025-12-16T16:47:02.152827592Z         
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.152828509Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2146, in _deserialize_partial_kwargs
   2025-12-16T16:47:02.153025467Z     deserialized[k] = 
cls._deserialize_field_value(k, v)
   2025-12-16T16:47:02.153028551Z                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153029551Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2114, in _deserialize_field_value
   2025-12-16T16:47:02.153243551Z     return cls._deserialize_datetime(value) 
if value is not None else None
   2025-12-16T16:47:02.153246634Z            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153247342Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/_shared/timezones/timezone.py",
 line 302, in from_timestamp
   2025-12-16T16:47:02.153305801Z     result = 
coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
   2025-12-16T16:47:02.153309592Z                              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153316634Z TypeError: 'str' object cannot be interpreted 
as an integer
   2025-12-16T16:47:02.153317676Z 
   2025-12-16T16:47:02.153318342Z The above exception was the direct cause of 
the following exception:
   2025-12-16T16:47:02.153319134Z 
   2025-12-16T16:47:02.153319842Z Traceback (most recent call last):
   2025-12-16T16:47:02.153320592Z   File "/usr/local/bin/airflow", line 10, in 
<module>
   2025-12-16T16:47:02.153370801Z     sys.exit(main())
   2025-12-16T16:47:02.153372134Z              ^^^^^^
   2025-12-16T16:47:02.153372842Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
   2025-12-16T16:47:02.153460551Z     args.func(args)
   2025-12-16T16:47:02.153462759Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, 
in command
   2025-12-16T16:47:02.153505509Z     return func(*args, **kwargs)
   2025-12-16T16:47:02.153506967Z            ^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153508009Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in 
wrapper
   2025-12-16T16:47:02.153586717Z     return f(*args, **kwargs)
   2025-12-16T16:47:02.153588842Z            ^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153589801Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
   2025-12-16T16:47:02.153613259Z     return func(*args, **kwargs)
   2025-12-16T16:47:02.153614384Z            ^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153615176Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 53, in dag_processor
   2025-12-16T16:47:02.153650259Z     run_command_with_daemon_option(
   2025-12-16T16:47:02.153651634Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
   2025-12-16T16:47:02.153711342Z     callback()
   2025-12-16T16:47:02.153715884Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 56, in <lambda>
   2025-12-16T16:47:02.153717342Z     callback=lambda: 
run_job(job=job_runner.job, execute_callable=job_runner._execute),
   2025-12-16T16:47:02.153751801Z                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153753342Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, 
in wrapper
   2025-12-16T16:47:02.153827801Z     return func(*args, session=session, 
**kwargs)
   2025-12-16T16:47:02.153834301Z            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153835467Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in 
run_job
   2025-12-16T16:47:02.153930467Z     return execute_job(job, 
execute_callable=execute_callable)
   2025-12-16T16:47:02.153933384Z            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.153937342Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in 
execute_job
   2025-12-16T16:47:02.154038676Z     ret = execute_callable()
   2025-12-16T16:47:02.154040717Z           ^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154041551Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
   2025-12-16T16:47:02.154042467Z     self.processor.run()
   2025-12-16T16:47:02.154043217Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", 
line 282, in run
   2025-12-16T16:47:02.154105134Z     return self._run_parsing_loop()
   2025-12-16T16:47:02.154108551Z            ^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154109551Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", 
line 371, in _run_parsing_loop
   2025-12-16T16:47:02.154169884Z     self._collect_results()
   2025-12-16T16:47:02.154173301Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, 
in wrapper
   2025-12-16T16:47:02.154254967Z     return func(*args, session=session, 
**kwargs)
   2025-12-16T16:47:02.154258342Z            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154259176Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", 
line 842, in _collect_results
   2025-12-16T16:47:02.154370384Z     self._file_stats[file] = 
process_parse_results(
   2025-12-16T16:47:02.154373009Z                              
^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154373967Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", 
line 1173, in process_parse_results
   2025-12-16T16:47:02.154499259Z     update_dag_parsing_results_in_db(
   2025-12-16T16:47:02.154502509Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py", 
line 394, in update_dag_parsing_results_in_db
   2025-12-16T16:47:02.154555176Z     for attempt in 
run_with_db_retries(logger=log):
   2025-12-16T16:47:02.154557467Z                    
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154558467Z   File 
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 445, in 
__iter__
   2025-12-16T16:47:02.154660634Z     do = self.iter(retry_state=retry_state)
   2025-12-16T16:47:02.154662759Z          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154663592Z   File 
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in 
iter
   2025-12-16T16:47:02.154728592Z     result = action(retry_state)
   2025-12-16T16:47:02.154730509Z              ^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154731301Z   File 
"/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in 
<lambda>
   2025-12-16T16:47:02.154822551Z     self._add_action_func(lambda rs: 
rs.outcome.result())
   2025-12-16T16:47:02.154826217Z                                      
^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154830301Z   File 
"/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
   2025-12-16T16:47:02.154891384Z     return self.__get_result()
   2025-12-16T16:47:02.154896384Z            ^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.154897259Z   File 
"/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
   2025-12-16T16:47:02.154926176Z     raise self._exception
   2025-12-16T16:47:02.154927301Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py", 
line 404, in update_dag_parsing_results_in_db
   2025-12-16T16:47:02.155010467Z     SerializedDAG.bulk_write_to_db(
   2025-12-16T16:47:02.155012301Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in 
wrapper
   2025-12-16T16:47:02.155013176Z     return func(*args, **kwargs)
   2025-12-16T16:47:02.155063384Z            ^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.155065592Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2898, in bulk_write_to_db
   2025-12-16T16:47:02.155318801Z     dag_op.update_dags(orm_dags, 
parse_duration, session=session)
   2025-12-16T16:47:02.155321134Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/dag_processing/collection.py", 
line 556, in update_dags
   2025-12-16T16:47:02.155414676Z     dm.calculate_dagrun_date_fields(dag, 
last_automated_data_interval)  # type: ignore[arg-type]
   2025-12-16T16:47:02.155416676Z     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.155417551Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/dag.py", line 680, in 
calculate_dagrun_date_fields
   2025-12-16T16:47:02.155499759Z     next_dagrun_info = 
dag.next_dagrun_info(last_automated_data_interval)
   2025-12-16T16:47:02.155501967Z                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.155502967Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 3924, in next_dagrun_info
   2025-12-16T16:47:02.155842884Z     return 
self._real_dag.next_dagrun_info(*args, **kwargs)
   2025-12-16T16:47:02.155846342Z            ^^^^^^^^^^^^^^
   2025-12-16T16:47:02.155847301Z   File 
"/usr/local/lib/python3.12/functools.py", line 998, in __get__
   2025-12-16T16:47:02.155977051Z     val = self.func(instance)
   2025-12-16T16:47:02.155982801Z           ^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.155984134Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 3933, in _real_dag
   2025-12-16T16:47:02.156307634Z     return SerializedDAG.from_dict(self.data)
   2025-12-16T16:47:02.156311509Z            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.156312509Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2867, in from_dict
   2025-12-16T16:47:02.156555717Z     return 
cls.deserialize_dag(serialized_obj["dag"], client_defaults)
   2025-12-16T16:47:02.156562509Z            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   2025-12-16T16:47:02.156563676Z   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2558, in deserialize_dag
   2025-12-16T16:47:02.156807759Z     raise DeserializationError(dag_id) from 
err
   2025-12-16T16:47:02.156810217Z airflow.exceptions.DeserializationError: An 
unexpected error occurred while trying to deserialize Dag 
'dtm_from_date_deserialize_error'
   ```
   
   ### How to reproduce
   
   Here's an example DAG that will highlight the issue.
   
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.models.baseoperator import BaseOperator
   
   class MyOp(BaseOperator):
     template_fields = ("from_date", "to_date")
     def __init__(self, from_date, to_date, marker=None, **kwargs):
         super().__init__(**kwargs)
         self.from_date = from_date
         self.to_date = to_date
         self.marker = marker  # something to map over
   
     def execute(self, context):
         return {"from": self.from_date, "to": self.to_date, "marker": 
self.marker}
   
   with DAG(
     dag_id="dtm_from_date_deserialize_error",
     start_date=datetime(2024, 1, 1),
     schedule="@daily",
     catchup=False,
   ) as dag:
     MyOp.partial(
         task_id="mapped",
         from_date="{{ macros.ds_add(ds, -7) }}",
         to_date="{{ macros.ds_add(ds, -1) }}",
     ).expand(marker=[1])  # map over a primitive to force partial_kwargs 
serialization
   ```
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" 
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to