ashb commented on code in PR #57620:
URL: https://github.com/apache/airflow/pull/57620#discussion_r2517970953
##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -88,19 +108,44 @@ def __init__(self, **kwargs):
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
self._asset_factories = ProvidersManager().asset_factories
+ self._extra_counts: dict[str, int] = defaultdict(int)
+ self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
+
+ @staticmethod
+ def _generate_hash(value: Any) -> str:
+ """
+ Generate a deterministic MD5 hash for the given value.
+
+ If the value is dictionary it's JSON-serialized with `sort_keys=True`,
and unsupported types
+ are converted to strings (`default=str`) to favor producing a hash
rather than raising an error,
+ even if that means a less precise encoding.
+ """
+ extra_str = json.dumps(value, sort_keys=True, default=str)
+ extra_hash = hashlib.md5(extra_str.encode()).hexdigest()
Review Comment:
Nit: this fn can be called with anything, (it just happens to be currently
called with `asset.extra`) so I don't think these two variables should be
called `extra_*`
##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -198,31 +243,55 @@ def add_output_asset(
scheme=scheme, uri=uri, name=name, group=group,
asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
- key = self._generate_key(asset, context)
- if key not in self._outputs:
- self._outputs[key] = (asset, context)
- self._output_counts[key] += 1
+ entry_id = self._generate_asset_entry_id(asset=asset,
context=context)
+ if entry_id not in self._outputs:
+ self._outputs[entry_id] = (asset, context)
+ self._output_counts[entry_id] += 1
if len(self._outputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset outputs exceeded.
Skipping subsequent inputs.")
+ def add_extra(
+ self,
+ context: LineageContext,
+ key: str,
+ value: Any,
+ ):
+ """Add the extra information and its corresponding hook execution
context to the collector."""
+ if len(self._extra) >= MAX_COLLECTED_EXTRA:
+ self.log.debug("Maximum number of extra exceeded. Skipping.")
+ return
+ if not key or not value:
+ self.log.debug("Missing required parameter: both 'key' and 'value'
must be provided.")
+ return
+ entry_id = self._generate_extra_entry_id(key=key, value=value,
context=context)
+ if entry_id not in self._extra:
+ self._extra[entry_id] = (key, value, context)
+ self._extra_counts[entry_id] += 1
+ if len(self._extra) == MAX_COLLECTED_EXTRA:
+ self.log.warning("Maximum number of extra exceeded. Skipping
subsequent inputs.")
+
@property
def collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
return HookLineage(
- [
+ inputs=[
AssetLineageInfo(asset=asset, count=self._input_counts[key],
context=context)
for key, (asset, context) in self._inputs.items()
],
- [
+ outputs=[
AssetLineageInfo(asset=asset, count=self._output_counts[key],
context=context)
for key, (asset, context) in self._outputs.items()
],
+ extra=[
+ ExtraLineageInfo(key=key, value=value,
count=self._extra_counts[count_key], context=context)
+ for count_key, (key, value, context) in self._extra.items()
+ ],
)
@property
def has_collected(self) -> bool:
"""Check if any assets have been collected."""
- return len(self._inputs) != 0 or len(self._outputs) != 0
+ return len(self._inputs) != 0 or len(self._outputs) != 0 or
len(self._extra) != 0
Review Comment:
Nit: len is going to count everything, so is more "expensive" than it needs
to be. Likely the lists aren't long enough that this matters but since we are
changing things anyway:
```suggestion
return self._inputs or self._outputs or self._extra
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]