Lee-W commented on code in PR #65447:
URL: https://github.com/apache/airflow/pull/65447#discussion_r3217188360
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1138,7 +1144,11 @@ def _serialize_outlet_events(events:
OutletEventAccessorsProtocol) -> Iterator[d
# Further filtering will be done in the API server.
for key, accessor in events._dict.items():
if isinstance(key, AssetUniqueKey):
- yield {"dest_asset_key": attrs.asdict(key), "extra":
accessor.extra}
+ yield {
+ "dest_asset_key": attrs.asdict(key),
+ "extra": accessor.extra,
+ "partition_keys": list(accessor.partition_keys),
+ }
Review Comment:
```suggestion
if accessor.partition_keys:
yield from (
{
"dest_asset_key": attrs.asdict(key),
"extra": accessor.extra,
"partition_key": partition_key,
}
for partition_key in accessor.partition_keys
)
else:
yield {"dest_asset_key": attrs.asdict(key), "extra":
accessor.extra}
```
since we're to move away from `partition_keys` as it could be confusing, I
think we're going to change it to something like this
##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py:
##########
@@ -61,6 +61,23 @@ def remove_partition_key_from_asset_events(response:
ResponseInfo) -> None: # t
elem.pop("partition_key", None)
+class AddOutletPartitionKeysField(VersionChange):
Review Comment:
Then, we'll need to revert it as well.
##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py:
##########
@@ -54,6 +55,7 @@ class AssetEventResponse(BaseModel):
source_run_id: str | None = None
source_map_index: int | None = None
partition_key: str | None = None
+ partition_keys: list[str] = Field(default_factory=list)
Review Comment:
yep, let's remove it and do it another way
##########
task-sdk/src/airflow/sdk/definitions/asset/decorators.py:
##########
@@ -36,9 +36,33 @@
from airflow.sdk.bases.decorator import _TaskDecorator
from airflow.sdk.definitions.dag import DagStateChangeCallback, ScheduleArg
from airflow.sdk.definitions.param import ParamsDict
+ from airflow.sdk.types import OutletEventAccessorsProtocol
from airflow.triggers.base import BaseTrigger
+_INVALID_INLET_ASSET_NAMES = ("self", "context", "outlet_events")
+
+
+class _AssetSelfProxy:
+ """Proxy for ``self`` in ``@asset`` functions; intercepts
``partition_keys`` writes and forwards them to the outlet event accessor."""
+
+ def __init__(self, asset: Asset, outlet_events:
OutletEventAccessorsProtocol) -> None:
+ object.__setattr__(self, "_asset", asset)
+ object.__setattr__(self, "_outlet_events", outlet_events)
+
+ def __getattr__(self, name: str) -> Any:
+ if name == "partition_keys":
+ return self._outlet_events[self._asset].partition_keys
+ return getattr(self._asset, name)
+
+ def __setattr__(self, name: str, value: Any) -> None:
+ if name != "partition_keys":
+ raise AttributeError(
+ f"Cannot set {name!r} on @asset self; only 'partition_keys' is
settable at runtime"
+ )
+ self._outlet_events[self._asset].partition_keys = value
Review Comment:
I think this was more for a syntax sugar, but yep, let's remove it if it's
too hard to comprehend.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -488,6 +488,13 @@ class OutletEventAccessor(_AssetRefResolutionMixin):
key: BaseAssetUniqueKey
extra: dict[str, JsonValue] = attrs.Factory(dict)
asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
+ partition_keys: list[str] = attrs.field(factory=list)
+
+ def add_partitions(self, keys: str | list[str]) -> None:
+ """Append a partition key to :attr:`partition_keys`."""
+ if isinstance(keys, str):
+ keys = [keys]
+ self.partition_keys.extend(keys)
Review Comment:
It won't be an issue if it's in different runs.
But for the same ti, we can use the latest one (basically, a set. IIRC,
that's how asset alias work as well )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]