This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e8b5d566879 HookLevelLineage can now add arbitrary data with add_extra
(#57620)
e8b5d566879 is described below
commit e8b5d5668791c838a34df3e6d782f6b8da612e8f
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Nov 14 10:36:57 2025 +0100
HookLevelLineage can now add arbitrary data with add_extra (#57620)
It's useful, but it's missing an option to attach non-asset related
metadata collected from hooks
(f.e. query_id, or query_text), that can be also useful in context of
lineage and desired by hook
lineage readers. For example, having the executed SQL query enables SQL
parsing on the reader
side, retrieving input and output datasets, which can significantly improve
lineage extraction
accuracy.
---
.../docs/administration-and-deployment/lineage.rst | 2 +
airflow-core/src/airflow/lineage/hook.py | 107 +++-
airflow-core/tests/unit/lineage/test_hook.py | 673 ++++++++++++++++++++-
3 files changed, 758 insertions(+), 24 deletions(-)
diff --git a/airflow-core/docs/administration-and-deployment/lineage.rst
b/airflow-core/docs/administration-and-deployment/lineage.rst
index c914e0aa5b2..a47d043c112 100644
--- a/airflow-core/docs/administration-and-deployment/lineage.rst
+++ b/airflow-core/docs/administration-and-deployment/lineage.rst
@@ -28,6 +28,7 @@ This functionality helps you understand how data flows
throughout your Airflow p
A global instance of ``HookLineageCollector`` serves as the central hub for
collecting lineage information.
Hooks can send details about assets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Assets, a
standard format for describing assets.
+Hooks can also send arbitrary non-asset related data to this collector as
shown in the example below.
.. code-block:: python
@@ -40,6 +41,7 @@ The collector then uses this data to construct AIP-60
compliant Assets, a standa
collector = get_hook_lineage_collector()
collector.add_input_asset(self, asset_kwargs={"scheme": "file",
"path": "/tmp/in"})
collector.add_output_asset(self, asset_kwargs={"scheme": "file",
"path": "/tmp/out"})
+ collector.add_extra(self, key="external_system_job_id",
value="some_id_123")
Lineage data collected by the ``HookLineageCollector`` can be accessed using
an instance of ``HookLineageReader``,
which is registered in an Airflow plugin.
diff --git a/airflow-core/src/airflow/lineage/hook.py
b/airflow-core/src/airflow/lineage/hook.py
index ede28a7855f..660ef12abe7 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/airflow-core/src/airflow/lineage/hook.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import hashlib
import json
from collections import defaultdict
-from typing import TYPE_CHECKING, TypeAlias
+from typing import TYPE_CHECKING, Any, TypeAlias
import attr
@@ -43,6 +43,9 @@ _hook_lineage_collector: HookLineageCollector | None = None
# Input assets and output assets are collected separately.
MAX_COLLECTED_ASSETS = 100
+# Maximum number of extra metadata that can be collected in a single hook
execution.
+MAX_COLLECTED_EXTRA = 200
+
@attr.define
class AssetLineageInfo:
@@ -58,6 +61,22 @@ class AssetLineageInfo:
context: LineageContext
[email protected]
+class ExtraLineageInfo:
+ """
+ Holds lineage information for arbitrary non-asset metadata.
+
+ This class represents additional lineage context captured during a hook
execution that is not
+ associated with a specific asset. It includes the metadata payload itself,
the count of
+ how many times it has been encountered, and the context in which it was
encountered.
+ """
+
+ key: str
+ value: Any
+ count: int
+ context: LineageContext
+
+
@attr.define
class HookLineage:
"""
@@ -70,6 +89,7 @@ class HookLineage:
inputs: list[AssetLineageInfo] = attr.ib(factory=list)
outputs: list[AssetLineageInfo] = attr.ib(factory=list)
+ extra: list[ExtraLineageInfo] = attr.ib(factory=list)
class HookLineageCollector(LoggingMixin):
@@ -88,19 +108,44 @@ class HookLineageCollector(LoggingMixin):
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
self._asset_factories = ProvidersManager().asset_factories
+ self._extra_counts: dict[str, int] = defaultdict(int)
+ self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
+
+ @staticmethod
+ def _generate_hash(value: Any) -> str:
+ """
+ Generate a deterministic MD5 hash for the given value.
+
+ If the value is dictionary it's JSON-serialized with `sort_keys=True`,
and unsupported types
+ are converted to strings (`default=str`) to favor producing a hash
rather than raising an error,
+ even if that means a less precise encoding.
+ """
+ value_str = json.dumps(value, sort_keys=True, default=str)
+ value_hash = hashlib.md5(value_str.encode()).hexdigest()
+ return value_hash
- def _generate_key(self, asset: Asset, context: LineageContext) -> str:
+ def _generate_asset_entry_id(self, asset: Asset, context: LineageContext)
-> str:
"""
Generate a unique key for the given asset and context.
This method creates a unique key by combining the asset URI, the MD5
hash of the asset's extra
- dictionary, and the LineageContext's unique identifier. This ensures
that the generated key is
+ dictionary, and the LineageContext's unique identifier. This ensures
that the generated entry_id is
unique for each combination of asset and context.
"""
- extra_str = json.dumps(asset.extra, sort_keys=True)
- extra_hash = hashlib.md5(extra_str.encode()).hexdigest()
+ extra_hash = self._generate_hash(value=asset.extra)
return f"{asset.uri}_{extra_hash}_{id(context)}"
+ def _generate_extra_entry_id(self, key: str, value: Any, context:
LineageContext) -> str:
+ """
+ Generate a unique key for the given extra lineage information and
context.
+
+ This method creates a unique key by combining the extra information
key, an MD5 hash of the value,
+ and the LineageContext's unique identifier. This ensures that the
generated entry_id is unique
+ for each combination of extra lineage information and context.
+ """
+ value_hash = self._generate_hash(value=value)
+ return f"{key}_{value_hash}_{id(context)}"
+
def create_asset(
self,
*,
@@ -173,10 +218,10 @@ class HookLineageCollector(LoggingMixin):
scheme=scheme, uri=uri, name=name, group=group,
asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
- key = self._generate_key(asset, context)
- if key not in self._inputs:
- self._inputs[key] = (asset, context)
- self._input_counts[key] += 1
+ entry_id = self._generate_asset_entry_id(asset, context)
+ if entry_id not in self._inputs:
+ self._inputs[entry_id] = (asset, context)
+ self._input_counts[entry_id] += 1
if len(self._inputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset inputs exceeded.
Skipping subsequent inputs.")
@@ -198,31 +243,55 @@ class HookLineageCollector(LoggingMixin):
scheme=scheme, uri=uri, name=name, group=group,
asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
- key = self._generate_key(asset, context)
- if key not in self._outputs:
- self._outputs[key] = (asset, context)
- self._output_counts[key] += 1
+ entry_id = self._generate_asset_entry_id(asset=asset,
context=context)
+ if entry_id not in self._outputs:
+ self._outputs[entry_id] = (asset, context)
+ self._output_counts[entry_id] += 1
if len(self._outputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset outputs exceeded.
Skipping subsequent inputs.")
+ def add_extra(
+ self,
+ context: LineageContext,
+ key: str,
+ value: Any,
+ ):
+ """Add the extra information and its corresponding hook execution
context to the collector."""
+ if len(self._extra) >= MAX_COLLECTED_EXTRA:
+ self.log.debug("Maximum number of extra exceeded. Skipping.")
+ return
+ if not key or not value:
+ self.log.debug("Missing required parameter: both 'key' and 'value'
must be provided.")
+ return
+ entry_id = self._generate_extra_entry_id(key=key, value=value,
context=context)
+ if entry_id not in self._extra:
+ self._extra[entry_id] = (key, value, context)
+ self._extra_counts[entry_id] += 1
+ if len(self._extra) == MAX_COLLECTED_EXTRA:
+ self.log.warning("Maximum number of extra exceeded. Skipping
subsequent inputs.")
+
@property
def collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
return HookLineage(
- [
+ inputs=[
AssetLineageInfo(asset=asset, count=self._input_counts[key],
context=context)
for key, (asset, context) in self._inputs.items()
],
- [
+ outputs=[
AssetLineageInfo(asset=asset, count=self._output_counts[key],
context=context)
for key, (asset, context) in self._outputs.items()
],
+ extra=[
+ ExtraLineageInfo(key=key, value=value,
count=self._extra_counts[count_key], context=context)
+ for count_key, (key, value, context) in self._extra.items()
+ ],
)
@property
def has_collected(self) -> bool:
"""Check if any assets have been collected."""
- return len(self._inputs) != 0 or len(self._outputs) != 0
+ return bool(self._inputs or self._outputs or self._extra)
class NoOpCollector(HookLineageCollector):
@@ -238,6 +307,9 @@ class NoOpCollector(HookLineageCollector):
def add_output_asset(self, *_, **__):
pass
+ def add_extra(self, *_, **__):
+ pass
+
@property
def collected_assets(
self,
@@ -245,13 +317,14 @@ class NoOpCollector(HookLineageCollector):
self.log.debug(
"Data lineage tracking is disabled. Register a hook lineage reader
to start tracking hook lineage."
)
- return HookLineage([], [])
+ return HookLineage([], [], [])
class HookLineageReader(LoggingMixin):
"""Class used to retrieve the hook lineage information collected by
HookLineageCollector."""
def __init__(self, **kwargs):
+ super().__init__(**kwargs)
self.lineage_collector = get_hook_lineage_collector()
def retrieve_hook_lineage(self) -> HookLineage:
diff --git a/airflow-core/tests/unit/lineage/test_hook.py
b/airflow-core/tests/unit/lineage/test_hook.py
index 26f6b802ff6..ee55d20b32e 100644
--- a/airflow-core/tests/unit/lineage/test_hook.py
+++ b/airflow-core/tests/unit/lineage/test_hook.py
@@ -38,10 +38,74 @@ from tests_common.test_utils.mock_plugins import
mock_plugin_manager
class TestHookLineageCollector:
- @pytest.fixture
- def collector(self, scope="method"):
+ @pytest.fixture # default scope is function
+ def collector(self):
return HookLineageCollector()
+ def test_generate_hash_handles_non_serializable(self, collector):
+ class Obj:
+ def __str__(self):
+ return "<obj>"
+
+ h1 = collector._generate_hash({"a": Obj()})
+ h2 = collector._generate_hash({"a": "<obj>"})
+ assert isinstance(h1, str)
+ assert h1 == h2
+
+ def test_generate_hash_is_deterministic(self, collector):
+ h1 = collector._generate_hash({"foo": "bar"})
+ h2 = collector._generate_hash({"foo": "bar"})
+ assert h1 == h2
+
+ def test_generate_hash_changes_with_value(self, collector):
+ h1 = collector._generate_hash({"foo": "bar"})
+ h2 = collector._generate_hash({"foo": "different"})
+ assert h1 != h2
+
+ def test_generate_asset_entry_id_deterministic(self, collector):
+ asset = Asset(uri="s3://bucket/file", extra={"x": 1})
+ ctx = BaseHook()
+ key1 = collector._generate_asset_entry_id(asset, ctx)
+ key2 = collector._generate_asset_entry_id(asset, ctx)
+ assert key1 == key2
+
+ def test_generate_asset_entry_id_differs_by_context(self, collector):
+ asset = Asset(uri="s3://bucket/file")
+ ctx1 = BaseHook()
+ ctx2 = BaseHook()
+ key1 = collector._generate_asset_entry_id(asset, ctx1)
+ key2 = collector._generate_asset_entry_id(asset, ctx2)
+ assert key1 != key2
+
+ def test_generate_asset_entry_id_differs_by_extra(self, collector):
+ ctx = BaseHook()
+ asset1 = Asset(uri="s3://bucket/file", extra={"foo": "bar"})
+ asset2 = Asset(uri="s3://bucket/file", extra={"foo": "other"})
+ key1 = collector._generate_asset_entry_id(asset1, ctx)
+ key2 = collector._generate_asset_entry_id(asset2, ctx)
+ assert key1 != key2
+
+ def test_generate_extra_entry_id_deterministic(self, collector):
+ ctx = BaseHook()
+ key1 = collector._generate_extra_entry_id("k", "v", ctx)
+ key2 = collector._generate_extra_entry_id("k", "v", ctx)
+ assert key1 == key2
+
+ def test_generate_extra_entry_id_differs_by_context(self, collector):
+ ctx1 = BaseHook()
+ ctx2 = BaseHook()
+ key1 = collector._generate_extra_entry_id("k", "v", ctx1)
+ key2 = collector._generate_extra_entry_id("k", "v", ctx2)
+ assert key1 != key2
+
+ def test_generate_extra_entry_id_differs_by_key_value(self, collector):
+ ctx = BaseHook()
+ key1 = collector._generate_extra_entry_id("k", "v1", ctx)
+ key2 = collector._generate_extra_entry_id("k", "v2", ctx)
+ key3 = collector._generate_extra_entry_id("k2", "v1", ctx)
+ assert key1 != key2
+ assert key1 != key3
+
def test_are_assets_collected(self, collector):
assert collector is not None
assert collector.collected_assets == HookLineage()
@@ -78,10 +142,10 @@ class TestHookLineageCollector:
asset = MagicMock(spec=Asset, extra={})
mock_asset.return_value = asset
- hook = MagicMock(spec=BaseHook)
- collector.add_input_asset(hook, uri="test_uri")
+ mock_hook = MagicMock(spec=BaseHook)
+ collector.add_input_asset(mock_hook, uri="test_uri")
- assert next(iter(collector._inputs.values())) == (asset, hook)
+ assert next(iter(collector._inputs.values())) == (asset, mock_hook)
mock_asset.assert_called_once_with(uri="test_uri")
def test_grouping_assets(self, collector):
@@ -166,6 +230,9 @@ class TestHookLineageCollector:
is None
)
+ def test_create_asset_missing_parameters_returns_none(self, collector):
+ assert collector.create_asset() is None
+
def test_collected_assets(self, collector):
context_input = MagicMock(spec=BaseHook)
context_output = MagicMock(spec=BaseHook)
@@ -187,8 +254,122 @@ class TestHookLineageCollector:
collector._inputs = {"unique_key": (MagicMock(spec=Asset),
MagicMock(spec=BaseHook))}
assert collector.has_collected
- def test_hooks_limit_input_output_assets(self):
- collector = HookLineageCollector()
+ def test_add_asset_count_tracking(self, collector):
+ """Test that duplicate assets are counted correctly."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Add same input multiple times
+ collector.add_input_asset(ctx, uri="s3://bucket/input")
+ collector.add_input_asset(ctx, uri="s3://bucket/input")
+ collector.add_input_asset(ctx, uri="s3://bucket/input")
+
+ # Add same output multiple times
+ collector.add_output_asset(ctx, uri="s3://bucket/output")
+ collector.add_output_asset(ctx, uri="s3://bucket/output")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.uri == "s3://bucket/input"
+ assert lineage.inputs[0].count == 3
+
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].asset.uri == "s3://bucket/output"
+ assert lineage.outputs[0].count == 2
+
+ def test_add_asset_different_uris(self, collector):
+ """Test that different URIs are tracked separately."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(ctx, uri="s3://bucket/file1")
+ collector.add_input_asset(ctx, uri="s3://bucket/file2")
+ collector.add_input_asset(ctx,
uri="postgres://example.com:5432/database/default/table")
+
+ collector.add_output_asset(ctx, uri="s3://output/file1")
+ collector.add_output_asset(ctx, uri="s3://output/file2")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 3
+ assert lineage.inputs[0].asset.uri == "s3://bucket/file1"
+ assert lineage.inputs[1].asset.uri == "s3://bucket/file2"
+ assert lineage.inputs[2].asset.uri ==
"postgres://example.com:5432/database/default/table"
+
+ assert len(lineage.outputs) == 2
+ assert lineage.outputs[0].asset.uri == "s3://output/file1"
+ assert lineage.outputs[1].asset.uri == "s3://output/file2"
+
+ def test_add_asset_different_contexts(self, collector):
+ """Test that different contexts are tracked separately."""
+ ctx1 = MagicMock(spec=BaseHook)
+ ctx2 = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(ctx1, uri="s3://bucket/file")
+ collector.add_input_asset(ctx2, uri="s3://bucket/file")
+
+ collector.add_output_asset(ctx1, uri="s3://output/file")
+ collector.add_output_asset(ctx2, uri="s3://output/file")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 2
+ assert lineage.inputs[0].context == ctx1
+ assert lineage.inputs[1].context == ctx2
+
+ assert len(lineage.outputs) == 2
+ assert lineage.outputs[0].context == ctx1
+ assert lineage.outputs[1].context == ctx2
+
+ def test_add_asset_with_extra_metadata(self, collector):
+ """Test adding assets with extra metadata."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(
+ ctx,
+ uri="postgres://example.com:5432/database/default/table",
+ asset_extra={"schema": "public", "table": "users"},
+ )
+ collector.add_output_asset(
+ ctx,
+ uri="postgres://example.com:5432/database/default/table",
+ asset_extra={"schema": "public", "table": "results"},
+ )
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.extra == {"schema": "public", "table":
"users"}
+
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].asset.extra == {"schema": "public", "table":
"results"}
+
+ def test_add_asset_different_extra_values(self, collector):
+ """Test that assets with different extra values are tracked
separately."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Same URI but different extra metadata
+ collector.add_input_asset(ctx, uri="s3://bucket/file",
asset_extra={"version": "1"})
+ collector.add_input_asset(ctx, uri="s3://bucket/file",
asset_extra={"version": "2"})
+
+ collector.add_output_asset(ctx, uri="s3://output/file",
asset_extra={"format": "parquet"})
+ collector.add_output_asset(ctx, uri="s3://output/file",
asset_extra={"format": "csv"})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 2
+ assert lineage.inputs[0].asset.extra == {"version": "1"}
+ assert lineage.inputs[1].asset.extra == {"version": "2"}
+
+ assert len(lineage.outputs) == 2
+ assert lineage.outputs[0].asset.extra == {"format": "parquet"}
+ assert lineage.outputs[1].asset.extra == {"format": "csv"}
+
+ def test_hooks_limit_input_output_assets(self, collector):
assert not collector.has_collected
for i in range(1000):
@@ -199,6 +380,484 @@ class TestHookLineageCollector:
assert len(collector._inputs) == 100
assert len(collector._outputs) == 100
+ @pytest.mark.parametrize("uri", ["", None])
+ def test_invalid_uri_none(self, collector, uri):
+ """Test handling of None or empty URI - should not raise."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Should not raise exceptions
+ collector.add_input_asset(ctx, uri=uri)
+ collector.add_output_asset(ctx, uri=uri)
+
+ # Collector should handle gracefully and not collect invalid URIs
+ assert not collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 0
+ assert len(lineage.outputs) == 0
+
+ def test_malformed_uri(self, collector):
+ """Test handling of malformed URIs - should not raise."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Various malformed URIs should not cause crashes
+ collector.add_input_asset(ctx, uri="not-a-valid-uri")
+ collector.add_input_asset(ctx, uri="://missing-scheme")
+ collector.add_input_asset(ctx, uri="scheme:")
+ collector.add_output_asset(ctx, uri="//no-scheme")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 3
+ assert lineage.inputs[0].asset.uri == "not-a-valid-uri"
+ assert lineage.inputs[1].asset.uri == "://missing-scheme"
+ assert lineage.inputs[2].asset.uri == "scheme:/"
+
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].asset.uri == "//no-scheme"
+
+ def test_very_long_uri(self, collector):
+ """Test handling of very long URIs - 1000 chars OK, 2000 chars raises
ValueError."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Create very long URI (1000 chars - should work)
+ long_path = "a" * 1000
+ long_uri = f"s3://bucket/{long_path}"
+
+ # Create too long URI (2000 chars - should raise)
+ too_long_uri = f"s3://bucket/{long_path * 2}"
+
+ collector.add_input_asset(ctx, uri=long_uri)
+
+ # Too long URI should raise ValueError
+ with pytest.raises(ValueError, match="Asset name cannot exceed"):
+ collector.add_output_asset(ctx, uri=too_long_uri)
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.uri == long_uri
+
+ assert len(lineage.outputs) == 0
+
+ def test_uri_with_special_characters(self, collector):
+ """Test URIs with special characters - should not raise."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # URIs with various special characters
+ special_uris = {
+ "s3://bucket/path with spaces/file": "s3://bucket/path with
spaces/file",
+ "s3://bucket/path%20encoded/file":
"s3://bucket/path%20encoded/file",
+ "file:///path/with/üñíçødé/chars":
"file:///path/with/üñíçødé/chars",
+ "scheme://host/path?query=value&other=123":
"scheme://host/path?other=123&query=value",
+ "scheme://host/path#fragment": "scheme://host/path",
+ "postgres://user:p@ss!word@host:5432/db/sche$ma/table":
"postgres://host:5432/db/sche$ma/table",
+ }
+
+ for uri in special_uris:
+ collector.add_input_asset(ctx, uri=uri)
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 6
+
+ for i, expected_uri in enumerate(special_uris.values()):
+ assert lineage.inputs[i].asset.uri == expected_uri
+
+ def test_empty_asset_extra(self, collector):
+ """Test that empty asset_extra is handled correctly."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(ctx, uri="s3://bucket/file", asset_extra={})
+ collector.add_output_asset(ctx, uri="s3://bucket/file", asset_extra={})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.extra == {}
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].asset.extra == {}
+
+ def test_asset_with_all_optional_parameters(self, collector):
+ """Test asset creation with all optional parameters provided."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(
+ ctx,
+ uri="s3://bucket/file",
+ name="custom-name",
+ group="custom-group",
+ asset_extra={"key": "value"},
+ )
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.uri == "s3://bucket/file"
+ assert lineage.inputs[0].asset.name == "custom-name"
+ assert lineage.inputs[0].asset.group == "custom-group"
+ assert lineage.inputs[0].asset.extra == {"key": "value"}
+
+ def test_asset_extra_with_non_serializable(self, collector):
+ """Test that asset_extra with non-JSON-serializable values is
handled."""
+ ctx = MagicMock(spec=BaseHook)
+
+ class CustomObject:
+ pass
+
+ # Should not raise
+ collector.add_input_asset(ctx, uri="s3://bucket/file",
asset_extra={"obj": CustomObject()})
+
+ # May or may not be collected depending on implementation
+ lineage = collector.collected_assets
+ # Just verify it doesn't crash and structure is intact
+ assert isinstance(lineage.inputs, list)
+ assert isinstance(lineage.outputs, list)
+
+ def test_empty_name_and_group(self, collector):
+ """Test that empty strings for name and group are handled."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Empty strings for optional parameters
+ collector.add_input_asset(ctx, uri="s3://bucket/file", name="",
group="")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].asset.uri == "s3://bucket/file"
+ assert lineage.inputs[0].asset.name == "s3://bucket/file"
+ assert lineage.inputs[0].asset.group == "asset"
+
+ def test_add_extra(self, collector):
+ ctx = MagicMock(spec=BaseHook)
+ collector.add_extra(ctx, "k", "v")
+
+ data = collector.collected_assets.extra
+ assert len(data) == 1
+ assert data[0].key == "k"
+ assert data[0].value == "v"
+ assert data[0].context == ctx
+ assert data[0].count == 1
+
+ # adding again with same values only increments count
+ collector.add_extra(ctx, "k", "v")
+ assert collector.collected_assets.extra[0].count == 2
+ data = collector.collected_assets.extra
+ assert len(data) == 1
+
+ def test_add_extra_missing_key_or_value(self, collector):
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "", "v")
+ collector.add_extra(ctx, "k", None)
+
+ # nothing added
+ assert len(collector.collected_assets.extra) == 0
+
+ def test_extra_limit(self, collector):
+ ctx = MagicMock(spec=BaseHook)
+
+ for i in range(501):
+ collector.add_extra(ctx, f"k{i}", f"v{i}")
+
+ assert len(collector.collected_assets.extra) == 200
+
+ def test_add_extra_different_values(self, collector):
+ """Test that different values are tracked separately."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "key1", {"data": "value1"})
+ collector.add_extra(ctx, "key2", {"data": "value2"})
+ collector.add_extra(ctx, "key1", {"data": "value3"})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 3
+ assert lineage.extra[0].key == "key1"
+ assert lineage.extra[0].value == {"data": "value1"}
+ assert lineage.extra[0].count == 1
+ assert lineage.extra[1].key == "key2"
+ assert lineage.extra[1].value == {"data": "value2"}
+ assert lineage.extra[1].count == 1
+ assert lineage.extra[2].key == "key1"
+ assert lineage.extra[2].value == {"data": "value3"}
+ assert lineage.extra[2].count == 1
+
+ def test_add_extra_different_contexts(self, collector):
+ """Test that different contexts are tracked separately."""
+ ctx1 = MagicMock(spec=BaseHook)
+ ctx2 = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx1, "test_key", {"data": "value"})
+ collector.add_extra(ctx2, "test_key", {"data": "value"})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 2
+ assert lineage.extra[0].context == ctx1
+ assert lineage.extra[1].context == ctx2
+
+ def test_add_extra_complex_values(self, collector):
+ """Test that add_extra handles complex JSON-serializable values."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "dict", {"nested": {"data": "value"}})
+ collector.add_extra(ctx, "list", [1, 2, 3, "test"])
+ collector.add_extra(ctx, "number", 42)
+ collector.add_extra(ctx, "string", "simple string")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 4
+ assert lineage.extra[0].value == {"nested": {"data": "value"}}
+ assert lineage.extra[1].value == [1, 2, 3, "test"]
+ assert lineage.extra[2].value == 42
+ assert lineage.extra[3].value == "simple string"
+
+ def test_special_characters_in_extra_key(self, collector):
+ """Test that extra keys with special characters work."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "key-with-dashes", {"data": "value"})
+ collector.add_extra(ctx, "key.with.dots", {"data": "value"})
+ collector.add_extra(ctx, "key_with_underscores", {"data": "value"})
+ collector.add_extra(ctx, "key/with/slashes", {"data": "value"})
+ collector.add_extra(ctx, "key:with:colons", {"data": "value"})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 5
+ assert lineage.extra[0].key == "key-with-dashes"
+ assert lineage.extra[1].key == "key.with.dots"
+ assert lineage.extra[2].key == "key_with_underscores"
+ assert lineage.extra[3].key == "key/with/slashes"
+ assert lineage.extra[4].key == "key:with:colons"
+
+ def test_unicode_in_extra_key_and_value(self, collector):
+ """Test that unicode characters in extra work correctly."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "clé_française", {"données": "valeur"})
+ collector.add_extra(ctx, "中文键", {"中文": "值"})
+ collector.add_extra(ctx, "مفتاح", {"بيانات": "قيمة"})
+ collector.add_extra(ctx, "emoji_🚀", {"status": "✅"})
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 4
+ assert lineage.extra[0].key == "clé_française"
+ assert lineage.extra[0].value == {"données": "valeur"}
+ assert lineage.extra[1].key == "中文键"
+ assert lineage.extra[1].value == {"中文": "值"}
+ assert lineage.extra[2].key == "مفتاح"
+ assert lineage.extra[2].value == {"بيانات": "قيمة"}
+ assert lineage.extra[3].key == "emoji_🚀"
+ assert lineage.extra[3].value == {"status": "✅"}
+
+ def test_very_large_extra_value(self, collector):
+ """Test that large extra values are handled."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Create a large value
+ large_value = {"data": "x" * 1000, "list": list(range(1000))}
+
+ collector.add_extra(ctx, "large_key", large_value)
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 1
+ assert lineage.extra[0].key == "large_key"
+ assert lineage.extra[0].value == large_value
+
+ def test_deeply_nested_extra_value(self, collector):
+ """Test that deeply nested data structures in extra are handled."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Create deeply nested structure
+ nested_value = {"level1": {"level2": {"level3": {"level4": {"level5":
{"data": "deep"}}}}}}
+
+ collector.add_extra(ctx, "nested", nested_value)
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 1
+ assert lineage.extra[0].value == nested_value
+
+ def test_extra_value_with_various_types(self, collector):
+ """Test that extra can handle various data types."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_extra(ctx, "string", "text")
+ collector.add_extra(ctx, "integer", 42)
+ collector.add_extra(ctx, "float", 3.14)
+ collector.add_extra(ctx, "boolean", True)
+ collector.add_extra(ctx, "list", [1, 2, 3])
+ collector.add_extra(ctx, "dict", {"key": "value"})
+ collector.add_extra(ctx, "null", None)
+
+ assert collector.has_collected
+
+ # None value should not be collected (based on validation)
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 6 # None is filtered out
+
+ assert lineage.extra[0].value == "text"
+ assert lineage.extra[1].value == 42
+ assert lineage.extra[2].value == 3.14
+ assert lineage.extra[3].value is True
+ assert lineage.extra[4].value == [1, 2, 3]
+ assert lineage.extra[5].value == {"key": "value"}
+
+ def test_non_json_serializable_value_in_extra(self, collector):
+ """Test that non-JSON-serializable values are handled gracefully."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Create a non-serializable object
+ class CustomObject:
+ def __str__(self):
+ return "custom_object"
+
+ # Should not raise - collector should handle via str conversion or skip
+ collector.add_extra(ctx, "custom_key", CustomObject())
+
+ # May or may not be collected depending on implementation
+ lineage = collector.collected_assets
+ # Just verify it doesn't crash
+ assert isinstance(lineage.extra, list)
+
+ def test_extremely_long_extra_key(self, collector):
+ """Test that extremely long extra keys are handled."""
+ ctx = MagicMock(spec=BaseHook)
+
+ long_key = "k" * 1000
+ collector.add_extra(ctx, long_key, "value")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.extra) == 1
+ assert lineage.extra[0].key == long_key
+ assert lineage.extra[0].value == "value"
+
+ def test_collected_assets_called_multiple_times(self, collector):
+ """Test that collected_assets property can be called multiple times."""
+ ctx = MagicMock(spec=BaseHook)
+
+ collector.add_input_asset(ctx, uri="s3://bucket/file")
+
+ assert collector.has_collected
+
+ # Call multiple times - should return same data
+ lineage1 = collector.collected_assets
+ lineage2 = collector.collected_assets
+ lineage3 = collector.collected_assets
+
+ assert lineage1.inputs == lineage2.inputs == lineage3.inputs
+ assert len(lineage1.inputs) == 1
+
+ def test_has_collected_only_extra(self, collector):
+ assert collector.has_collected is False
+
+ collector.add_extra(MagicMock(spec=BaseHook), "event", "trigger")
+
+ assert collector.has_collected is True
+ assert len(collector.collected_assets.inputs) == 0
+ assert len(collector.collected_assets.outputs) == 0
+ assert len(collector.collected_assets.extra) == 1
+
+ def test_none_context(self, collector):
+ """Test handling of None context - should not raise."""
+ # Should not raise exceptions
+ collector.add_input_asset(None, uri="s3://bucket/input")
+ collector.add_output_asset(None, uri="s3://bucket/output")
+ collector.add_extra(None, "key", "value")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].context is None
+
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].context is None
+
+ assert len(lineage.extra) == 1
+ assert lineage.extra[0].context is None
+
+ def test_rapid_repeated_calls(self, collector):
+ """Test that rapid repeated calls don't cause issues."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Simulate rapid repeated calls
+ for _ in range(50):
+ collector.add_input_asset(ctx, uri="s3://bucket/file")
+ collector.add_output_asset(ctx, uri="s3://bucket/output")
+ collector.add_extra(ctx, "key", "value")
+
+ assert collector.has_collected
+
+ lineage = collector.collected_assets
+ # Should have counted properly
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].count == 50
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].count == 50
+ assert len(lineage.extra) == 1
+ assert lineage.extra[0].count == 50
+
+ def test_mixed_valid_invalid_operations(self, collector):
+ """Test mixing valid and invalid operations."""
+ ctx = MagicMock(spec=BaseHook)
+
+ # Mix valid and invalid calls
+ collector.add_input_asset(ctx, uri="s3://bucket/valid")
+ collector.add_input_asset(ctx, uri=None) # Invalid - should not be
collected
+ collector.add_input_asset(ctx, uri="") # Invalid - should not be
collected
+ collector.add_input_asset(ctx, uri="s3://bucket/another-valid")
+
+ collector.add_extra(ctx, "valid_key", "valid_value")
+ collector.add_extra(ctx, "", "invalid_key") # Invalid key - should
not be collected
+ collector.add_extra(ctx, "another_key", "another_value")
+
+ assert collector.has_collected
+
+ # Should collect only valid items
+ lineage = collector.collected_assets
+ assert len(lineage.inputs) == 2
+ assert lineage.inputs[0].asset.uri == "s3://bucket/valid"
+ assert lineage.inputs[1].asset.uri == "s3://bucket/another-valid"
+
+ assert len(lineage.extra) == 2
+ assert lineage.extra[0].key == "valid_key"
+ assert lineage.extra[0].value == "valid_value"
+ assert lineage.extra[1].key == "another_key"
+ assert lineage.extra[1].value == "another_value"
+
+ def test_noop_collector(self):
+ noop = NoOpCollector()
+ ctx = MagicMock(spec=BaseHook)
+ noop.add_input_asset(ctx, uri="x")
+ noop.add_output_asset(ctx, uri="y")
+ noop.add_extra(ctx, "k", "v")
+
+ lineage = noop.collected_assets
+ assert lineage.inputs == []
+ assert lineage.outputs == []
+ assert lineage.extra == []
+
class FakePlugin(plugins_manager.AirflowPlugin):
name = "FakePluginHavingHookLineageCollector"