Lee-W commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2570609713


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -229,12 +242,39 @@ def notify_asset_alias_created(asset_assets: AssetAlias):
     def notify_asset_changed(asset: Asset):
         """Run applicable notification actions when an asset is changed."""
         try:
+            # TODO: AIP-76 this will have to change. needs to know *what* 
happened to the asset (e.g. partition key)

Review Comment:
   `on_asset_changed` itself might make more sense for the cases that asset 
became inactive or active?
   
   `on_asset_event` for these thing makes more sense to me



##########
airflow-core/src/airflow/serialization/serialized_objects.py:
##########
@@ -459,6 +472,45 @@ def decode_timetable(var: dict[str, Any]) -> Timetable:
     return timetable_class.deserialize(var[Encoding.VAR])
 
 
+def _load_partition_mapper(importable_string) -> PartitionMapper | None:
+    if importable_string.startswith("airflow.timetables."):
+        return import_string(importable_string)
+    else:
+        return None

Review Comment:
   ```suggestion
       if importable_string.startswith("airflow.timetables."):
           return import_string(importable_string)
       return None
   ```



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -229,12 +242,39 @@ def notify_asset_alias_created(asset_assets: AssetAlias):
     def notify_asset_changed(asset: Asset):
         """Run applicable notification actions when an asset is changed."""
         try:
+            # TODO: AIP-76 this will have to change. needs to know *what* 
happened to the asset (e.g. partition key)
+            #  maybe we should just add the event to the signature
+            #  or add a new hook `on_asset_event`
             get_listener_manager().hook.on_asset_changed(asset=asset)
         except Exception:
             log.exception("error calling listener")
 
     @classmethod
-    def _queue_dagruns(cls, asset_id: int, dags_to_queue: set[DagModel], 
session: Session) -> None:
+    def _queue_dagruns(
+        cls,
+        asset_id: int,

Review Comment:
   ```suggestion
           cls,
           *,
           asset_id: int,
   ```
   
   since we only call it with keyword argument, not a bad idea to make it 
keyword only



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1317,6 +1317,8 @@ def register_asset_changes_in_db(
     ) -> None:
         from airflow.sdk.definitions.asset import Asset, AssetAlias, 
AssetNameRef, AssetUniqueKey, AssetUriRef
 
+        # TODO: AIP-76 should we provide an interface to override this?

Review Comment:
   Why should we? What's the use case in your mind



##########
airflow-core/src/airflow/example_dags/example_outlet_event_extra.py:
##########
@@ -68,6 +68,9 @@ def asset_with_extra_by_context(*, outlet_events=None):
 
     def _asset_with_extra_from_classic_operator_post_execute(context, result):
         context["outlet_events"][asset].extra = {"hi": "bye"}
+        # TODO: AIP-76 probably we want to make it so this could be

Review Comment:
   In the current syntax, `context["outlet_events"][asset]` works more like 
operating events of asset. Are you suggesting something like
   
   `context["outlet_events"][asset].events = ...`



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -243,15 +283,94 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue: 
set[DagModel], session: Se
         # "fallback" to running this in a nested transaction. This is needed
         # so that the adding of these rows happens in the same transaction
         # where `ti.state` is changed.
-        if not dags_to_queue:
-            return
-
         if get_dialect_name(session) == "postgresql":
-            return cls._postgres_queue_dagruns(asset_id, dags_to_queue, 
session)
-        return cls._slow_path_queue_dagruns(asset_id, dags_to_queue, session)
+            return cls._queue_dagruns_nonpartitioned_postgres(asset_id, 
dags_to_queue, session)
+        return cls._queue_dagruns_nonpartitioned_slow_path(asset_id, 
dags_to_queue, session)
 
     @classmethod
-    def _slow_path_queue_dagruns(cls, asset_id: int, dags_to_queue: 
set[DagModel], session: Session) -> None:
+    def _queue_partitioned_dags(
+        cls,
+        asset_id: int,
+        all_dags: set[DagModel],
+        event: AssetEvent,
+        partition_key: str | None,
+        session: Session,
+    ) -> list[DagModel]:
+        # TODO: AIP-76 there may be a better way to identify that timetable is 
partition-driven
+        partition_dags = [x for x in all_dags if x.timetable_summary == 
"Partitioned Asset"]
+        if partition_dags and not partition_key:
+            # TODO: AIP-76 how to best ensure users can see this? Probably add 
Log record.
+            log.warning(
+                "Listening dags are partition-aware but run has no partition 
key",
+                listening_dags=[x.dag_id for x in partition_dags],
+                asset_id=asset_id,
+                run_id=event.source_run_id,
+                dag_id=event.source_dag_id,
+                task_id=event.source_task_id,
+            )
+            return partition_dags
+
+        for target_dag in partition_dags:
+            if TYPE_CHECKING:
+                assert partition_key is not None
+            from airflow.models.serialized_dag import SerializedDagModel
+
+            serdag = SerializedDagModel.get(dag_id=target_dag.dag_id, 
session=session)
+            if not serdag:
+                raise RuntimeError(f"Could not find serialized dag for 
dag_id={target_dag.dag_id}")
+            timetable = serdag.dag.timetable
+            if TYPE_CHECKING:
+                assert isinstance(timetable, PartitionedAssetTimetable)
+            target_key = 
timetable.partition_mapper.to_downstream(partition_key)
+
+            apdr = cls._get_or_create_apdr(
+                target_key=target_key,
+                target_dag=target_dag,
+                session=session,
+            )
+            log_record = PartitionedAssetKeyLog(
+                asset_id=asset_id,
+                asset_event_id=event.id,
+                asset_partition_dag_run_id=apdr.id,
+                source_partition_key=partition_key,
+                target_dag_id=target_dag.dag_id,
+                target_partition_key=target_key,
+            )
+            session.add(log_record)
+        return partition_dags
+
+    @classmethod
+    def _get_or_create_apdr(
+        cls,
+        target_key: str,
+        target_dag: SerializedDagModel,
+        session: Session,
+    ):

Review Comment:
   ```suggestion
       ) -> AssetPartitionDagRun:
   ```



##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -217,3 +218,66 @@ def next_dagrun_info(
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
         return None
+
+
+class PartitionMapper:

Review Comment:
   ```suggestion
   class PartitionMapper(ABC):
   ```



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -243,15 +283,94 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue: 
set[DagModel], session: Se
         # "fallback" to running this in a nested transaction. This is needed
         # so that the adding of these rows happens in the same transaction
         # where `ti.state` is changed.
-        if not dags_to_queue:
-            return
-
         if get_dialect_name(session) == "postgresql":
-            return cls._postgres_queue_dagruns(asset_id, dags_to_queue, 
session)
-        return cls._slow_path_queue_dagruns(asset_id, dags_to_queue, session)
+            return cls._queue_dagruns_nonpartitioned_postgres(asset_id, 
dags_to_queue, session)
+        return cls._queue_dagruns_nonpartitioned_slow_path(asset_id, 
dags_to_queue, session)
 
     @classmethod
-    def _slow_path_queue_dagruns(cls, asset_id: int, dags_to_queue: 
set[DagModel], session: Session) -> None:
+    def _queue_partitioned_dags(
+        cls,
+        asset_id: int,
+        all_dags: set[DagModel],
+        event: AssetEvent,
+        partition_key: str | None,
+        session: Session,
+    ) -> list[DagModel]:
+        # TODO: AIP-76 there may be a better way to identify that timetable is 
partition-driven
+        partition_dags = [x for x in all_dags if x.timetable_summary == 
"Partitioned Asset"]
+        if partition_dags and not partition_key:
+            # TODO: AIP-76 how to best ensure users can see this? Probably add 
Log record.
+            log.warning(
+                "Listening dags are partition-aware but run has no partition 
key",
+                listening_dags=[x.dag_id for x in partition_dags],
+                asset_id=asset_id,
+                run_id=event.source_run_id,
+                dag_id=event.source_dag_id,
+                task_id=event.source_task_id,
+            )

Review Comment:
   Should we move these logic to `_queue_dagruns` and run the following 
partition Dag logic in `_queue_partitioned_dags` only? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to