ashb commented on code in PR #57620:
URL: https://github.com/apache/airflow/pull/57620#discussion_r2517970953


##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -88,19 +108,44 @@ def __init__(self, **kwargs):
         self._input_counts: dict[str, int] = defaultdict(int)
         self._output_counts: dict[str, int] = defaultdict(int)
         self._asset_factories = ProvidersManager().asset_factories
+        self._extra_counts: dict[str, int] = defaultdict(int)
+        self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
+
+    @staticmethod
+    def _generate_hash(value: Any) -> str:
+        """
+        Generate a deterministic MD5 hash for the given value.
+
+        If the value is dictionary it's JSON-serialized with `sort_keys=True`, 
and unsupported types
+        are converted to strings (`default=str`) to favor producing a hash 
rather than raising an error,
+        even if that means a less precise encoding.
+        """
+        extra_str = json.dumps(value, sort_keys=True, default=str)
+        extra_hash = hashlib.md5(extra_str.encode()).hexdigest()

Review Comment:
   Nit: this fn can be called with anything, (it just happens to be currently 
called with `asset.extra`) so I don't think these two variables should be 
called `extra_*`



##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -198,31 +243,55 @@ def add_output_asset(
             scheme=scheme, uri=uri, name=name, group=group, 
asset_kwargs=asset_kwargs, asset_extra=asset_extra
         )
         if asset:
-            key = self._generate_key(asset, context)
-            if key not in self._outputs:
-                self._outputs[key] = (asset, context)
-            self._output_counts[key] += 1
+            entry_id = self._generate_asset_entry_id(asset=asset, 
context=context)
+            if entry_id not in self._outputs:
+                self._outputs[entry_id] = (asset, context)
+            self._output_counts[entry_id] += 1
         if len(self._outputs) == MAX_COLLECTED_ASSETS:
             self.log.warning("Maximum number of asset outputs exceeded. 
Skipping subsequent inputs.")
 
+    def add_extra(
+        self,
+        context: LineageContext,
+        key: str,
+        value: Any,
+    ):
+        """Add the extra information and its corresponding hook execution 
context to the collector."""
+        if len(self._extra) >= MAX_COLLECTED_EXTRA:
+            self.log.debug("Maximum number of extra exceeded. Skipping.")
+            return
+        if not key or not value:
+            self.log.debug("Missing required parameter: both 'key' and 'value' 
must be provided.")
+            return
+        entry_id = self._generate_extra_entry_id(key=key, value=value, 
context=context)
+        if entry_id not in self._extra:
+            self._extra[entry_id] = (key, value, context)
+        self._extra_counts[entry_id] += 1
+        if len(self._extra) == MAX_COLLECTED_EXTRA:
+            self.log.warning("Maximum number of extra exceeded. Skipping 
subsequent inputs.")
+
     @property
     def collected_assets(self) -> HookLineage:
         """Get the collected hook lineage information."""
         return HookLineage(
-            [
+            inputs=[
                 AssetLineageInfo(asset=asset, count=self._input_counts[key], 
context=context)
                 for key, (asset, context) in self._inputs.items()
             ],
-            [
+            outputs=[
                 AssetLineageInfo(asset=asset, count=self._output_counts[key], 
context=context)
                 for key, (asset, context) in self._outputs.items()
             ],
+            extra=[
+                ExtraLineageInfo(key=key, value=value, 
count=self._extra_counts[count_key], context=context)
+                for count_key, (key, value, context) in self._extra.items()
+            ],
         )
 
     @property
     def has_collected(self) -> bool:
         """Check if any assets have been collected."""
-        return len(self._inputs) != 0 or len(self._outputs) != 0
+        return len(self._inputs) != 0 or len(self._outputs) != 0 or 
len(self._extra) != 0

Review Comment:
   Nit: len is going to count everything, so is more "expensive" than it needs 
to be. Likely the lists aren't long enough that this matters but since we are 
changing things anyway:
   
    ```suggestion
            return self._inputs or self._outputs or self._extra
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to