kacpermuda commented on code in PR #58057:
URL: https://github.com/apache/airflow/pull/58057#discussion_r2557077290
##########
providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py:
##########
@@ -16,24 +16,153 @@
# under the License.
from __future__ import annotations
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
+from typing import TYPE_CHECKING
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
-def _get_asset_compat_hook_lineage_collector():
- from airflow.lineage.hook import get_hook_lineage_collector
+if TYPE_CHECKING:
+ from typing import Any
- collector = get_hook_lineage_collector()
+ from airflow.lineage.hook import LineageContext
+
+def _add_extra_polyfill(collector):
+ """
+ Add support for extra lineage information on Airflow versions < 3.2.
+
+ This polyfill adds the `add_extra` method and modifies `collected_assets`
property to include
+ extra lineage information. Should be called after renaming from dataset to
asset. It's rewriting
+ `collected_assets` method and not `collected_datasets` method.
+ """
+ # We already added it, skip
+ if hasattr(collector, "add_extra"):
+ return collector
+
+ import hashlib
+ import json
+ import types
+ from collections import defaultdict
+
+ import attr
+
+ from airflow.lineage.hook import HookLineage as _BaseHookLineage
+
+ # Add `extra` to HookLineage returned by `collected_assets` property
+ @attr.define
+ class ExtraLineageInfo:
+ """
+ Holds lineage information for arbitrary non-asset metadata.
+
+ This class represents additional lineage context captured during a
hook execution that is not
+ associated with a specific asset. It includes the metadata payload
itself, the count of
+ how many times it has been encountered, and the context in which it
was encountered.
+ """
+
+ key: str
+ value: Any
+ count: int
+ context: LineageContext
+
+ @attr.define
+ class HookLineage(_BaseHookLineage):
+ # mypy is not happy, as base class is using other ExtraLineageInfo,
but this code will never
+ # run on AF3.2, where this other one is used, so this is fine - we can
ignore.
+ extra: list[ExtraLineageInfo] = attr.field(factory=list) # type:
ignore[assignment]
+
+ # Initialize extra tracking attributes
+ collector._extra = {}
+ collector._extra_counts = defaultdict(int)
+
+ # Save the original `collected_assets` getter
+ _original_collected_assets = collector.__class__.collected_assets
+
+ def _compat_collected_assets(self) -> HookLineage:
+ """Get the collected hook lineage information."""
+ # call the original `collected_assets` getter
+ lineage = _original_collected_assets.fget(self)
+ extra_list = [
+ ExtraLineageInfo(
+ key=key,
+ value=value,
+ count=self._extra_counts[count_key],
+ context=context,
+ )
+ for count_key, (key, value, context) in self._extra.items()
+ ]
+ return HookLineage(
+ inputs=lineage.inputs,
+ outputs=lineage.outputs,
+ extra=extra_list,
+ )
+
+ setattr(
+ collector.__class__,
+ "collected_assets",
+ property(lambda c: _compat_collected_assets(c)),
+ )
Review Comment:
Correct, adjusted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]