vincbeck commented on code in PR #44797:
URL: https://github.com/apache/airflow/pull/44797#discussion_r1878452631


##########
airflow/dag_processing/collection.py:
##########
@@ -547,3 +547,19 @@ def add_asset_trigger_references(
     def _encrypt_trigger_kwargs(trigger: BaseTrigger) -> tuple[str, str]:
         classpath, kwargs = trigger.serialize()
         return classpath, Trigger.encrypt_kwargs(kwargs)
+
+    @staticmethod
+    def _get_trigger_hash(classpath: str, kwargs: dict[str, Any]) -> int:
+        """
+        Return the hash of the trigger classpath and kwargs. This is used to 
uniquely identify a trigger.
+
+        We do not want to move this logic in a `__hash__` method in 
`BaseTrigger` because we do not want to
+        make the triggers hashable. The reason being, when the triggerer 
retrieve the list of triggers, we do
+        not want it dedupe them. When used to defer tasks, 2 triggers can have 
the same classpath and kwargs.
+        This is not true for event driven scheduling.
+        """
+        return hash((classpath, frozenset(kwargs.items())))

Review Comment:
   Unfortunately, `classpath` and `kwargs` is the only information available to 
compare 2 triggers. I do not have other choice than using these 2 pieces of 
information. One solution could be to implement a solution which "tries" to 
handle all types like:
   
   ```
   def hash_dict(obj: dict) -> int:    
       def hash_any(value):
           """Handles hashing for any type of object."""
           if isinstance(value, dict):
               return hash_dict(value)  # Recursively hash nested dictionaries
           elif isinstance(value, list):
               return hash(tuple(hash_any(item) for item in value))  # Hash 
list as tuple of hashes
           elif isinstance(value, set):
               return hash(frozenset(hash_any(item) for item in value))  # Hash 
set as frozenset
           elif isinstance(value, tuple):
               return hash(tuple(hash_any(item) for item in value))  # Handle 
nested tuples
           elif isinstance(value, (int, float, str, bool, type(None))):  # 
Simple hashable types
               return hash(value)
           else:
               raise ValueError(f"Unsupported type for hashing: {type(value)}")
       
       # Create a sorted tuple of key-value pairs (keys and values are 
recursively hashed)
       sorted_items = tuple(sorted((hash_any(k), hash_any(v)) for k, v in 
obj.items()))
       return hash(sorted_items)
   ```



##########
airflow/dag_processing/collection.py:
##########
@@ -547,3 +547,19 @@ def add_asset_trigger_references(
     def _encrypt_trigger_kwargs(trigger: BaseTrigger) -> tuple[str, str]:
         classpath, kwargs = trigger.serialize()
         return classpath, Trigger.encrypt_kwargs(kwargs)
+
+    @staticmethod
+    def _get_trigger_hash(classpath: str, kwargs: dict[str, Any]) -> int:
+        """
+        Return the hash of the trigger classpath and kwargs. This is used to 
uniquely identify a trigger.
+
+        We do not want to move this logic in a `__hash__` method in 
`BaseTrigger` because we do not want to
+        make the triggers hashable. The reason being, when the triggerer 
retrieve the list of triggers, we do
+        not want it dedupe them. When used to defer tasks, 2 triggers can have 
the same classpath and kwargs.
+        This is not true for event driven scheduling.
+        """
+        return hash((classpath, frozenset(kwargs.items())))

Review Comment:
   Unfortunately, `classpath` and `kwargs` is the only information available to 
compare 2 triggers. I do not have other choice than using these 2 pieces of 
information. One solution could be to implement a solution which "tries" to 
handle all types like:
   
   ```
   def hash_dict(obj: dict) -> int:    
       def hash_any(value):
           if isinstance(value, dict):
               return hash_dict(value)  # Recursively hash nested dictionaries
           elif isinstance(value, list):
               return hash(tuple(hash_any(item) for item in value))  # Hash 
list as tuple of hashes
           elif isinstance(value, set):
               return hash(frozenset(hash_any(item) for item in value))  # Hash 
set as frozenset
           elif isinstance(value, tuple):
               return hash(tuple(hash_any(item) for item in value))  # Handle 
nested tuples
           elif isinstance(value, (int, float, str, bool, type(None))):  # 
Simple hashable types
               return hash(value)
           else:
               raise ValueError(f"Unsupported type for hashing: {type(value)}")
       
       # Create a sorted tuple of key-value pairs (keys and values are 
recursively hashed)
       sorted_items = tuple(sorted((hash_any(k), hash_any(v)) for k, v in 
obj.items()))
       return hash(sorted_items)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to