kacpermuda commented on code in PR #57620:
URL: https://github.com/apache/airflow/pull/57620#discussion_r2518027767


##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -198,31 +243,55 @@ def add_output_asset(
             scheme=scheme, uri=uri, name=name, group=group, 
asset_kwargs=asset_kwargs, asset_extra=asset_extra
         )
         if asset:
-            key = self._generate_key(asset, context)
-            if key not in self._outputs:
-                self._outputs[key] = (asset, context)
-            self._output_counts[key] += 1
+            entry_id = self._generate_asset_entry_id(asset=asset, 
context=context)
+            if entry_id not in self._outputs:
+                self._outputs[entry_id] = (asset, context)
+            self._output_counts[entry_id] += 1
         if len(self._outputs) == MAX_COLLECTED_ASSETS:
             self.log.warning("Maximum number of asset outputs exceeded. 
Skipping subsequent inputs.")
 
+    def add_extra(
+        self,
+        context: LineageContext,
+        key: str,
+        value: Any,
+    ):
+        """Add the extra information and its corresponding hook execution 
context to the collector."""
+        if len(self._extra) >= MAX_COLLECTED_EXTRA:
+            self.log.debug("Maximum number of extra exceeded. Skipping.")
+            return
+        if not key or not value:
+            self.log.debug("Missing required parameter: both 'key' and 'value' 
must be provided.")
+            return
+        entry_id = self._generate_extra_entry_id(key=key, value=value, 
context=context)
+        if entry_id not in self._extra:
+            self._extra[entry_id] = (key, value, context)
+        self._extra_counts[entry_id] += 1
+        if len(self._extra) == MAX_COLLECTED_EXTRA:
+            self.log.warning("Maximum number of extra exceeded. Skipping 
subsequent inputs.")
+
     @property
     def collected_assets(self) -> HookLineage:
         """Get the collected hook lineage information."""
         return HookLineage(
-            [
+            inputs=[
                 AssetLineageInfo(asset=asset, count=self._input_counts[key], 
context=context)
                 for key, (asset, context) in self._inputs.items()
             ],
-            [
+            outputs=[
                 AssetLineageInfo(asset=asset, count=self._output_counts[key], 
context=context)
                 for key, (asset, context) in self._outputs.items()
             ],
+            extra=[
+                ExtraLineageInfo(key=key, value=value, 
count=self._extra_counts[count_key], context=context)
+                for count_key, (key, value, context) in self._extra.items()
+            ],
         )
 
     @property
     def has_collected(self) -> bool:
         """Check if any assets have been collected."""
-        return len(self._inputs) != 0 or len(self._outputs) != 0
+        return len(self._inputs) != 0 or len(self._outputs) != 0 or 
len(self._extra) != 0

Review Comment:
   Sure changed it to `bool(self._inputs or self._outputs or self._extra)`. 
It's done on dicts, so we want to make sure to return bool and not a dict 
(hence the bool wrapper)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to