Lee-W commented on code in PR #66782:
URL: https://github.com/apache/airflow/pull/66782#discussion_r3233150407
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1476,10 +1476,20 @@ def register_asset_changes_in_db(
SerializedAssetUriRef,
)
- # TODO: AIP-76 should we provide an interface to override this, so
that the task can
- # tell the truth if for some reason it touches a different partition?
- # https://github.com/apache/airflow/issues/58474
- partition_key = ti.dag_run.partition_key
+ events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str |
None]]] = defaultdict(list)
Review Comment:
it would be easier to read this way
```suggestion
payloads_by_asset: dict[SerializedAssetUniqueKey,
list[OutletEventPayload] = defaultdict(list)
```
```python
class OutletEventPayload(NamedTuple):
extra: dict
partition_key: str | None
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1476,10 +1476,20 @@ def register_asset_changes_in_db(
SerializedAssetUriRef,
)
- # TODO: AIP-76 should we provide an interface to override this, so
that the task can
- # tell the truth if for some reason it touches a different partition?
- # https://github.com/apache/airflow/issues/58474
- partition_key = ti.dag_run.partition_key
+ events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str |
None]]] = defaultdict(list)
+ for outlet_event in outlet_events:
+ if "source_alias_name" in outlet_event:
+ continue
+ asset_key =
SerializedAssetUniqueKey(**outlet_event["dest_asset_key"])
+ events_by_asset[asset_key].append((outlet_event["extra"],
outlet_event.get("partition_key")))
+
+ runtime_pks: set[str] = {
+ pk for events in events_by_asset.values() for _, pk in events if
pk is not None
+ }
+ if len(runtime_pks) == 1 and ti.dag_run.partition_key is None:
Review Comment:
let's also add a comment here to explain what we're doing
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1476,10 +1476,20 @@ def register_asset_changes_in_db(
SerializedAssetUriRef,
)
- # TODO: AIP-76 should we provide an interface to override this, so
that the task can
- # tell the truth if for some reason it touches a different partition?
- # https://github.com/apache/airflow/issues/58474
- partition_key = ti.dag_run.partition_key
+ events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str |
None]]] = defaultdict(list)
+ for outlet_event in outlet_events:
+ if "source_alias_name" in outlet_event:
Review Comment:
```suggestion
# Alias-emitted events are handled separately further down via
# register_asset_change_for_alias, which uses the DagRun-level
# partition_key. Per-emission partition keys do not fan out
through
# the alias path — emission via an alias produces one event per
# resolved asset, all carrying the same dag_run_partition_key.
if "source_alias_name" in outlet_event:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]