kacpermuda commented on code in PR #57620:
URL: https://github.com/apache/airflow/pull/57620#discussion_r2518019512
##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -88,19 +108,44 @@ def __init__(self, **kwargs):
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
self._asset_factories = ProvidersManager().asset_factories
+ self._extra_counts: dict[str, int] = defaultdict(int)
+ self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
+
+ @staticmethod
+ def _generate_hash(value: Any) -> str:
+ """
+ Generate a deterministic MD5 hash for the given value.
+
+ If the value is dictionary it's JSON-serialized with `sort_keys=True`,
and unsupported types
+ are converted to strings (`default=str`) to favor producing a hash
rather than raising an error,
+ even if that means a less precise encoding.
+ """
+ extra_str = json.dumps(value, sort_keys=True, default=str)
+ extra_hash = hashlib.md5(extra_str.encode()).hexdigest()
Review Comment:
Correct, changed.
##########
airflow-core/src/airflow/lineage/hook.py:
##########
@@ -198,31 +243,55 @@ def add_output_asset(
scheme=scheme, uri=uri, name=name, group=group,
asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
- key = self._generate_key(asset, context)
- if key not in self._outputs:
- self._outputs[key] = (asset, context)
- self._output_counts[key] += 1
+ entry_id = self._generate_asset_entry_id(asset=asset,
context=context)
+ if entry_id not in self._outputs:
+ self._outputs[entry_id] = (asset, context)
+ self._output_counts[entry_id] += 1
if len(self._outputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset outputs exceeded.
Skipping subsequent inputs.")
+ def add_extra(
+ self,
+ context: LineageContext,
+ key: str,
+ value: Any,
+ ):
+ """Add the extra information and its corresponding hook execution
context to the collector."""
+ if len(self._extra) >= MAX_COLLECTED_EXTRA:
+ self.log.debug("Maximum number of extra exceeded. Skipping.")
+ return
+ if not key or not value:
+ self.log.debug("Missing required parameter: both 'key' and 'value'
must be provided.")
+ return
+ entry_id = self._generate_extra_entry_id(key=key, value=value,
context=context)
+ if entry_id not in self._extra:
+ self._extra[entry_id] = (key, value, context)
+ self._extra_counts[entry_id] += 1
+ if len(self._extra) == MAX_COLLECTED_EXTRA:
+ self.log.warning("Maximum number of extra exceeded. Skipping
subsequent inputs.")
+
@property
def collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
return HookLineage(
- [
+ inputs=[
AssetLineageInfo(asset=asset, count=self._input_counts[key],
context=context)
for key, (asset, context) in self._inputs.items()
],
- [
+ outputs=[
AssetLineageInfo(asset=asset, count=self._output_counts[key],
context=context)
for key, (asset, context) in self._outputs.items()
],
+ extra=[
+ ExtraLineageInfo(key=key, value=value,
count=self._extra_counts[count_key], context=context)
+ for count_key, (key, value, context) in self._extra.items()
+ ],
)
@property
def has_collected(self) -> bool:
"""Check if any assets have been collected."""
- return len(self._inputs) != 0 or len(self._outputs) != 0
+ return len(self._inputs) != 0 or len(self._outputs) != 0 or
len(self._extra) != 0
Review Comment:
Sure, can change it to `bool(self._inputs or self._outputs or self._extra)`.
It's done on dicts, so we want to make sure to return bool and not a dict
(hence the bool wrapper)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]