ashb commented on code in PR #58057:
URL: https://github.com/apache/airflow/pull/58057#discussion_r2556041699
##########
providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py:
##########
@@ -16,24 +16,153 @@
# under the License.
from __future__ import annotations
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
+from typing import TYPE_CHECKING
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
-def _get_asset_compat_hook_lineage_collector():
- from airflow.lineage.hook import get_hook_lineage_collector
+if TYPE_CHECKING:
+ from typing import Any
- collector = get_hook_lineage_collector()
+ from airflow.lineage.hook import LineageContext
+
+def _add_extra_polyfill(collector):
+ """
+ Add support for extra lineage information on Airflow versions < 3.2.
+
+ This polyfill adds the `add_extra` method and modifies `collected_assets`
property to include
+ extra lineage information. Should be called after renaming from dataset to
asset. It's rewriting
+ `collected_assets` method and not `collected_datasets` method.
+ """
+ # We already added it, skip
+ if hasattr(collector, "add_extra"):
+ return collector
+
+ import hashlib
+ import json
+ import types
+ from collections import defaultdict
+
+ import attr
+
+ from airflow.lineage.hook import HookLineage as _BaseHookLineage
+
+ # Add `extra` to HookLineage returned by `collected_assets` property
+ @attr.define
+ class ExtraLineageInfo:
+ """
+ Holds lineage information for arbitrary non-asset metadata.
+
+ This class represents additional lineage context captured during a
hook execution that is not
+ associated with a specific asset. It includes the metadata payload
itself, the count of
+ how many times it has been encountered, and the context in which it
was encountered.
+ """
+
+ key: str
+ value: Any
+ count: int
+ context: LineageContext
+
+ @attr.define
+ class HookLineage(_BaseHookLineage):
+ # mypy is not happy, as base class is using other ExtraLineageInfo,
but this code will never
+ # run on AF3.2, where this other one is used, so this is fine - we can
ignore.
+ extra: list[ExtraLineageInfo] = attr.field(factory=list) # type:
ignore[assignment]
+
+ # Initialize extra tracking attributes
+ collector._extra = {}
+ collector._extra_counts = defaultdict(int)
+
+ # Save the original `collected_assets` getter
+ _original_collected_assets = collector.__class__.collected_assets
+
+ def _compat_collected_assets(self) -> HookLineage:
+ """Get the collected hook lineage information."""
+ # call the original `collected_assets` getter
+ lineage = _original_collected_assets.fget(self)
+ extra_list = [
+ ExtraLineageInfo(
+ key=key,
+ value=value,
+ count=self._extra_counts[count_key],
+ context=context,
+ )
+ for count_key, (key, value, context) in self._extra.items()
+ ]
+ return HookLineage(
+ inputs=lineage.inputs,
+ outputs=lineage.outputs,
+ extra=extra_list,
+ )
+
+ setattr(
+ collector.__class__,
+ "collected_assets",
+ property(lambda c: _compat_collected_assets(c)),
+ )
Review Comment:
Does this need to be setattr?
I think this will work just as well
```suggestion
type(collector).collected_assets = property(_compat_collected_assets)
```
##########
providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py:
##########
@@ -72,17 +202,34 @@ def collected_assets_compat(collector) -> HookLineage:
setattr(
collector.__class__,
"collected_assets",
- property(lambda collector: collected_assets_compat(collector)),
+ property(lambda c: _compat_collected_assets(c)),
)
+ # Add `add_extra` polyfill for AF 2.x, needs to be called after setting
`collected_assets`
+ collector = _add_extra_polyfill(collector)
+
return collector
def get_hook_lineage_collector():
- # Dataset has been renamed as Asset in 3.0
- if AIRFLOW_V_3_0_PLUS:
- from airflow.lineage.hook import get_hook_lineage_collector
+ """
+ Get hook lineage collector with appropriate compatibility layers.
+
+ - AF 2.x: Apply both dataset->asset rename and then `add_extra` polyfill
+ - AF 3.0-3.1: Apply only `add_extra` polyfill, no renaming needed
+ - AF 3.2+: Use native implementation (no renaming or polyfill needed)
+ """
+ from airflow.lineage.hook import get_hook_lineage_collector as
get_global_collector
+
+ global_collector = get_global_collector()
+
+ # AF 2.x: needs dataset->asset rename + `add_extra` polyfill
+ if not AIRFLOW_V_3_0_PLUS:
+ return _get_af2_asset_compat_hook_lineage_collector(global_collector)
- return get_hook_lineage_collector()
+ # AF 3.0-3.1: needs only polyfill for `add_extra`
+ if not AIRFLOW_V_3_2_PLUS:
+ return _add_extra_polyfill(global_collector)
Review Comment:
I feel uneasy about this version-based approach, espeically as it isn't the
Airflow version we need to be targetting, but the apache-airflow-task-sdk
version, as in Airflow core 3.2 timeframe there is a decent chance we will have
removed the dependency, meaning TaskSDK could run without apache-airflow-core
installed at all.
Elsewhere we have:
```
if all(
getattr(collector, asset_method_name, None)
for asset_method_name in ("add_input_asset", "add_output_asset",
"collected_assets", "add_extra")
):
```
I think we need to do something similar here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]