This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc1baec36d9 feat: Make Hook Level Lineage limits configurable (#62010)
dc1baec36d9 is described below
commit dc1baec36d94d3e9906ee23ea0a18639bf301404
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Feb 17 15:48:08 2026 +0100
feat: Make Hook Level Lineage limits configurable (#62010)
---
.../src/airflow/config_templates/config.yml | 17 +++++
task-sdk/src/airflow/sdk/lineage.py | 5 +-
task-sdk/tests/task_sdk/test_lineage.py | 78 +++++++++++++++++++---
3 files changed, 89 insertions(+), 11 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 627399b0d86..ae27e45671a 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2031,6 +2031,23 @@ lineage:
type: string
example: ~
default: ""
+ max_assets_per_collector:
+ description: |
+ Maximum number of asset inputs or outputs that can be collected by a
single hook lineage collector
+ instance. Input assets and output assets are counted separately. Once
the maximum is reached, any
+ additional assets will be dropped.
+ version_added: 3.2.0
+ type: integer
+ example: ~
+ default: "100"
+ max_extras_per_collector:
+ description: |
+ Maximum number of extra metadata entries that can be collected by a
single hook lineage collector
+ instance. Once the maximum is reached, any additional extra metadata
will be dropped.
+ version_added: 3.2.0
+ type: integer
+ example: ~
+ default: "200"
operators:
description: ~
options:
diff --git a/task-sdk/src/airflow/sdk/lineage.py
b/task-sdk/src/airflow/sdk/lineage.py
index acdf284a596..82d00492948 100644
--- a/task-sdk/src/airflow/sdk/lineage.py
+++ b/task-sdk/src/airflow/sdk/lineage.py
@@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Any, TypeAlias
import attr
import structlog
+from airflow.sdk.configuration import conf
from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
@@ -44,10 +45,10 @@ log = structlog.getLogger(__name__)
# Maximum number of assets input or output that can be collected in a single
hook execution.
# Input assets and output assets are collected separately.
-MAX_COLLECTED_ASSETS = 100
+MAX_COLLECTED_ASSETS: int = conf.getint("lineage", "max_assets_per_collector",
fallback=100)
# Maximum number of extra metadata that can be collected in a single hook
execution.
-MAX_COLLECTED_EXTRA = 200
+MAX_COLLECTED_EXTRA: int = conf.getint("lineage", "max_extras_per_collector",
fallback=200)
@attr.define
diff --git a/task-sdk/tests/task_sdk/test_lineage.py
b/task-sdk/tests/task_sdk/test_lineage.py
index 3bf706cabb4..d312df330d6 100644
--- a/task-sdk/tests/task_sdk/test_lineage.py
+++ b/task-sdk/tests/task_sdk/test_lineage.py
@@ -28,9 +28,9 @@ from airflow.sdk.lineage import (
HookLineageCollector,
HookLineageReader,
NoOpCollector,
- get_hook_lineage_collector,
)
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.mock_plugins import mock_plugin_manager
@@ -39,6 +39,18 @@ class TestHookLineageCollector:
def collector(self):
return HookLineageCollector()
+ def test_default_max_collected_assets(self):
+ """Default MAX_COLLECTED_ASSETS is 100 (configurable via [lineage]
conf)."""
+ from airflow.sdk.lineage import MAX_COLLECTED_ASSETS
+
+ assert MAX_COLLECTED_ASSETS == 100
+
+ def test_default_max_collected_extra(self):
+ """Default MAX_COLLECTED_EXTRA is 200 (configurable via [lineage]
conf)."""
+ from airflow.sdk.lineage import MAX_COLLECTED_EXTRA
+
+ assert MAX_COLLECTED_EXTRA == 200
+
def test_generate_hash_handles_non_serializable(self, collector):
class Obj:
def __str__(self):
@@ -377,6 +389,27 @@ class TestHookLineageCollector:
assert len(collector._inputs) == 100
assert len(collector._outputs) == 100
+ def test_configurable_max_collected_assets(self):
+ """MAX_COLLECTED_ASSETS is read from [lineage] conf and limits both
inputs and outputs."""
+ import importlib
+
+ import airflow.sdk.lineage as lineage_mod
+
+ new_max = 4
+
+ with conf_vars({("lineage", "max_assets_per_collector"):
str(new_max)}):
+ importlib.reload(lineage_mod)
+ assert new_max == lineage_mod.MAX_COLLECTED_ASSETS
+
+ collector = lineage_mod.HookLineageCollector()
+ for i in range(new_max * 2):
+ collector.add_input_asset(MagicMock(spec=BaseHook),
uri=f"test://input/{i}")
+ collector.add_output_asset(MagicMock(spec=BaseHook),
uri=f"test://output/{i}")
+ assert len(collector._inputs) == new_max
+ assert len(collector._outputs) == new_max
+
+ importlib.reload(lineage_mod)
+
@pytest.mark.parametrize("uri", ["", None])
def test_invalid_uri_none(self, collector, uri):
"""Test handling of None or empty URI - should not raise."""
@@ -565,6 +598,26 @@ class TestHookLineageCollector:
assert len(collector.collected_assets.extra) == 200
+ def test_configurable_max_collected_extra(self):
+ """MAX_COLLECTED_EXTRA is read from [lineage] conf and limits extra
collection."""
+ import importlib
+
+ import airflow.sdk.lineage as lineage_mod
+
+ new_max = 4
+
+ with conf_vars({("lineage", "max_extras_per_collector"):
str(new_max)}):
+ importlib.reload(lineage_mod)
+ assert new_max == lineage_mod.MAX_COLLECTED_EXTRA
+
+ collector = lineage_mod.HookLineageCollector()
+ ctx = MagicMock(spec=BaseHook)
+ for i in range(new_max * 2):
+ collector.add_extra(ctx, f"key_{i}", f"value_{i}")
+ assert len(collector.collected_assets.extra) == new_max
+
+ importlib.reload(lineage_mod)
+
def test_add_extra_different_values(self, collector):
"""Test that different values are tracked separately."""
ctx = MagicMock(spec=BaseHook)
@@ -862,16 +915,23 @@ class FakePlugin(plugins_manager.AirflowPlugin):
@pytest.mark.parametrize(
- ("has_readers", "expected_class"),
+ ("has_readers", "expected_class_name"),
[
- (True, HookLineageCollector),
- (False, NoOpCollector),
+ (True, "HookLineageCollector"),
+ (False, "NoOpCollector"),
],
)
-def test_get_hook_lineage_collector(has_readers, expected_class):
- # reset cached instance
- get_hook_lineage_collector.cache_clear()
+def test_get_hook_lineage_collector(has_readers, expected_class_name):
+ # After importlib.reload in other tests, the top-level imported references
+ # (get_hook_lineage_collector, HookLineageCollector, NoOpCollector) may
point
+ # to stale class/function objects. Always fetch from the live module so
that
+ # the function, the returned instance, and the class used in isinstance()
all
+ # belong to the same reload generation.
+ import airflow.sdk.lineage as lineage_mod
+
+ lineage_mod.get_hook_lineage_collector.cache_clear()
+ expected_class = getattr(lineage_mod, expected_class_name)
plugins = [FakePlugin()] if has_readers else []
with mock_plugin_manager(plugins=plugins):
- assert isinstance(get_hook_lineage_collector(), expected_class)
- assert get_hook_lineage_collector() is get_hook_lineage_collector()
+ assert isinstance(lineage_mod.get_hook_lineage_collector(),
expected_class)
+ assert lineage_mod.get_hook_lineage_collector() is
lineage_mod.get_hook_lineage_collector()