gingeekrishna opened a new pull request, #69091:
URL: https://github.com/apache/airflow/pull/69091
## Summary
Fixes #68941
### Problem
When `DagParam` is used as a kwarg in `.partial()` of a dynamically mapped
task, `BaseSerialization.serialize()` had no branch for `DagParam` and fell
through to `cls.default_serialization(strict, var)` which calls `str(var)`.
Because `str(DagParam)` includes the object's memory address (e.g. `<DagParam
object at 0x7f3a...>`), the serialized `partial_kwargs` changed on every
scheduler parse, causing a new DAG version to be written on each cycle — **DAG
version inflation**.
**Reproducer:**
```python
from airflow.sdk import DAG, task
with DAG(dag_id=repro, schedule=None) as dag:
@task
def add(value):
return value
add.partial(value=dag.param(p, default)).expand(value=[1, 2, 3])
```
### Fix
1. **`enums.py`** — Add `DAT.DAG_PARAM = dag_param` to the type dispatch
enum.
2. **`serialized_objects.py`**:
- Add `DagParam` to the imports.
- Add a `_DagParamRef` NamedTuple (modelled on `_XComRef`) that acts as a
stable placeholder during deserialization, holding `dag_id`, `name`, and
`default` until the DAG is available.
- Add a `DagParam` branch in `BaseSerialization.serialize()` that encodes
the stable triple `(dag_id, name, default)` using the new `DAT.DAG_PARAM` type.
- Add a `DAT.DAG_PARAM` case in `BaseSerialization.deserialize()` that
returns a `_DagParamRef` placeholder.
- Add `OperatorSerialization._resolve_dag_param_refs()` to recursively
resolve placeholders once the DAG is hydrated.
- Update `set_task_dag_references()` to call `_resolve_dag_param_refs` on
`partial_kwargs` for `MappedOperator`.
### Why `_DagParamRef` instead of deserializing immediately?
`DagParam.__init__` requires a live `DAG` object (`current_dag.params[name]
= default`) but `partial_kwargs` are deserialized before the full DAG is
assembled. This is the same deferred-resolution pattern already used by
`_XComRef` for `XComArg`.
## Tests
Four regression tests added to `test_dag_serialization.py`:
| Test | What it verifies |
|---|---|
| `test_dagparam_in_partial_is_serialized_stably` | No memory address in
serialized `partial_kwargs`; `dag_param` encoding contains correct
`name`/`dag_id`/`default`; `_DagParamRef` placeholder created on individual-op
deserialization |
| `test_dagparam_in_partial_roundtrip` | Full `DagSerialization.to_dict /
from_dict` cycle produces a live `DagParam` with correct `_name`, `_default`,
and `current_dag` reference |
| `test_dagparam_in_partial_version_stability` | Two serializations of the
same DAG produce identical output (no inflation) |
| `test_dagparam_in_partial_no_default` | `DagParam` with no default
(`NOTSET`) round-trips correctly |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]