kaxil commented on code in PR #64625:
URL: https://github.com/apache/airflow/pull/64625#discussion_r3028046920


##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -1013,14 +1032,15 @@ def add_asset_trigger_references(
 
         for name_uri, asset in self.assets.items():
             # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it
-            asset_watcher_triggers = (
-                [
-                    {**encode_trigger(watcher.trigger), "watcher_name": 
watcher.name}
+            if name_uri in active_assets:
+                asset_watcher_triggers = [
+                    {"classpath": cp, "kwargs": kw, "watcher_name": 
watcher.name}
                     for watcher in asset.watchers
+                    for cp, kw in [_get_raw_trigger_kwargs(watcher.trigger)]

Review Comment:
   The fix looks correct for the reported bug, but there's no regression test 
verifying the actual symptom: that calling `add_asset_trigger_references` a 
second time is a no-op (same trigger IDs, no deletes/inserts). The existing 
tests check trigger count but not idempotency across parsing loops, which is 
where this bug manifested.



##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -1001,7 +1001,26 @@ def add_task_asset_references(
     def add_asset_trigger_references(
         self, assets: dict[tuple[str, str], AssetModel], *, session: Session
     ) -> None:
-        from airflow.serialization.encoders import encode_trigger
+        def _get_raw_trigger_kwargs(trigger):
+            """
+            Extract classpath and raw Python kwargs from a trigger.
+
+            Unlike encode_trigger, this does NOT wrap values with 
BaseSerialization.
+            For dict triggers (from deserialized DAGs), any 
BaseSerialization-wrapped
+            values are unwrapped back to Python objects, ensuring hash 
consistency
+            with kwargs read back from the DB via Trigger._decrypt_kwargs.
+            """
+            if isinstance(trigger, dict):
+                classpath = trigger["classpath"]
+                raw_kwargs = trigger["kwargs"]
+                unwrapped = {}

Review Comment:
   The per-value unwrapping only checks for dicts with `Encoding.TYPE`. If a 
kwarg value is a list containing serialized items (e.g., `[{"__type": "tuple", 
"__var": [1, 2]}]`), the list elements stay wrapped and `BaseEventTrigger.hash` 
would still produce inconsistent results. `smart_decode_trigger_kwargs` from 
`airflow.serialization.decoders` already handles this pattern -- you could 
apply it per-value instead of manually checking:
   
   ```python
   from airflow.serialization.decoders import smart_decode_trigger_kwargs
   unwrapped = {k: smart_decode_trigger_kwargs(v) for k, v in 
raw_kwargs.items()}
   ```



##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -1001,7 +1001,26 @@ def add_task_asset_references(
     def add_asset_trigger_references(
         self, assets: dict[tuple[str, str], AssetModel], *, session: Session
     ) -> None:
-        from airflow.serialization.encoders import encode_trigger
+        def _get_raw_trigger_kwargs(trigger):

Review Comment:
   This function doesn't close over any locals from 
`add_asset_trigger_references`. Defining it as a nested function means it gets 
re-created on every call. Moving it to module level (like 
`_find_active_assets`) or making it a `@staticmethod` would avoid that and make 
it independently testable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to