uranusjr commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2536706045
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -229,12 +242,40 @@ def notify_asset_alias_created(asset_assets: AssetAlias):
def notify_asset_changed(asset: Asset):
"""Run applicable notification actions when an asset is changed."""
try:
+ # todo: AIP-76 this will have to change. needs to know *what*
happened to the asset (e.g. partition key)
+ # maybe we should just add the event to the signature
+ # or add a new hook `on_asset_event`
get_listener_manager().hook.on_asset_changed(asset=asset)
except Exception:
log.exception("error calling listener")
@classmethod
- def _queue_dagruns(cls, asset_id: int, dags_to_queue: set[DagModel],
session: Session) -> None:
+ def _queue_dagruns(
+ cls,
+ asset_id: int,
+ dags_to_queue: set[DagModel],
+ partition_key: str | None,
+ event: AssetEvent,
+ session: Session,
+ ) -> None:
+ log.debug("dags to queue", dags_to_queue=dags_to_queue)
+
+ if not dags_to_queue:
+ return None
+
+ partition_dags = cls._queue_partitioned_dags(
+ asset_id=asset_id,
+ all_dags=dags_to_queue,
+ event=event,
+ partition_key=partition_key,
+ session=session,
+ )
+ for d in partition_dags:
+ dags_to_queue.remove(d) # don't double process
Review Comment:
```suggestion
dags_to_queue = dags_to_queue.difference(partition_dags) # don't
double process
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]