gingeekrishna opened a new pull request, #69091:
URL: https://github.com/apache/airflow/pull/69091

   ## Summary
   
   Fixes #68941
   
   ### Problem
   
   When `DagParam` is used as a kwarg in `.partial()` of a dynamically mapped 
task, `BaseSerialization.serialize()` had no branch for `DagParam` and fell 
through to `cls.default_serialization(strict, var)` which calls `str(var)`. 
Because `str(DagParam)` includes the object's memory address (e.g. `<DagParam 
object at 0x7f3a...>`), the serialized `partial_kwargs` changed on every 
scheduler parse, causing a new DAG version to be written on each cycle — **DAG 
version inflation**.
   
   **Reproducer:**
   ```python
   from airflow.sdk import DAG, task
   
   with DAG(dag_id=repro, schedule=None) as dag:
       @task
       def add(value):
           return value
   
       add.partial(value=dag.param(p, default)).expand(value=[1, 2, 3])
   ```
   
   ### Fix
   
   1. **`enums.py`** — Add `DAT.DAG_PARAM = dag_param` to the type dispatch 
enum.
   2. **`serialized_objects.py`**:
      - Add `DagParam` to the imports.
      - Add a `_DagParamRef` NamedTuple (modelled on `_XComRef`) that acts as a 
stable placeholder during deserialization, holding `dag_id`, `name`, and 
`default` until the DAG is available.
      - Add a `DagParam` branch in `BaseSerialization.serialize()` that encodes 
the stable triple `(dag_id, name, default)` using the new `DAT.DAG_PARAM` type.
      - Add a `DAT.DAG_PARAM` case in `BaseSerialization.deserialize()` that 
returns a `_DagParamRef` placeholder.
      - Add `OperatorSerialization._resolve_dag_param_refs()` to recursively 
resolve placeholders once the DAG is hydrated.
      - Update `set_task_dag_references()` to call `_resolve_dag_param_refs` on 
`partial_kwargs` for `MappedOperator`.
   
   ### Why `_DagParamRef` instead of deserializing immediately?
   
   `DagParam.__init__` requires a live `DAG` object (`current_dag.params[name] 
= default`) but `partial_kwargs` are deserialized before the full DAG is 
assembled. This is the same deferred-resolution pattern already used by 
`_XComRef` for `XComArg`.
   
   ## Tests
   
   Four regression tests added to `test_dag_serialization.py`:
   
   | Test | What it verifies |
   |---|---|
   | `test_dagparam_in_partial_is_serialized_stably` | No memory address in 
serialized `partial_kwargs`; `dag_param` encoding contains correct 
`name`/`dag_id`/`default`; `_DagParamRef` placeholder created on individual-op 
deserialization |
   | `test_dagparam_in_partial_roundtrip` | Full `DagSerialization.to_dict / 
from_dict` cycle produces a live `DagParam` with correct `_name`, `_default`, 
and `current_dag` reference |
   | `test_dagparam_in_partial_version_stability` | Two serializations of the 
same DAG produce identical output (no inflation) |
   | `test_dagparam_in_partial_no_default` | `DagParam` with no default 
(`NOTSET`) round-trips correctly |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to