This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dc1baec36d9 feat: Make Hook Level Lineage limits configurable (#62010)
dc1baec36d9 is described below

commit dc1baec36d94d3e9906ee23ea0a18639bf301404
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Feb 17 15:48:08 2026 +0100

    feat: Make Hook Level Lineage limits configurable (#62010)
---
 .../src/airflow/config_templates/config.yml        | 17 +++++
 task-sdk/src/airflow/sdk/lineage.py                |  5 +-
 task-sdk/tests/task_sdk/test_lineage.py            | 78 +++++++++++++++++++---
 3 files changed, 89 insertions(+), 11 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 627399b0d86..ae27e45671a 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2031,6 +2031,23 @@ lineage:
       type: string
       example: ~
       default: ""
+    max_assets_per_collector:
+      description: |
+        Maximum number of asset inputs or outputs that can be collected by a 
single hook lineage collector
+        instance. Input assets and output assets are counted separately. Once 
the maximum is reached, any
+        additional assets will be dropped.
+      version_added: 3.2.0
+      type: integer
+      example: ~
+      default: "100"
+    max_extras_per_collector:
+      description: |
+        Maximum number of extra metadata entries that can be collected by a 
single hook lineage collector
+        instance. Once the maximum is reached, any additional extra metadata 
will be dropped.
+      version_added: 3.2.0
+      type: integer
+      example: ~
+      default: "200"
 operators:
   description: ~
   options:
diff --git a/task-sdk/src/airflow/sdk/lineage.py 
b/task-sdk/src/airflow/sdk/lineage.py
index acdf284a596..82d00492948 100644
--- a/task-sdk/src/airflow/sdk/lineage.py
+++ b/task-sdk/src/airflow/sdk/lineage.py
@@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Any, TypeAlias
 import attr
 import structlog
 
+from airflow.sdk.configuration import conf
 from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
 from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
@@ -44,10 +45,10 @@ log = structlog.getLogger(__name__)
 
 # Maximum number of assets input or output that can be collected in a single 
hook execution.
 # Input assets and output assets are collected separately.
-MAX_COLLECTED_ASSETS = 100
+MAX_COLLECTED_ASSETS: int = conf.getint("lineage", "max_assets_per_collector", 
fallback=100)
 
 # Maximum number of extra metadata that can be collected in a single hook 
execution.
-MAX_COLLECTED_EXTRA = 200
+MAX_COLLECTED_EXTRA: int = conf.getint("lineage", "max_extras_per_collector", 
fallback=200)
 
 
 @attr.define
diff --git a/task-sdk/tests/task_sdk/test_lineage.py 
b/task-sdk/tests/task_sdk/test_lineage.py
index 3bf706cabb4..d312df330d6 100644
--- a/task-sdk/tests/task_sdk/test_lineage.py
+++ b/task-sdk/tests/task_sdk/test_lineage.py
@@ -28,9 +28,9 @@ from airflow.sdk.lineage import (
     HookLineageCollector,
     HookLineageReader,
     NoOpCollector,
-    get_hook_lineage_collector,
 )
 
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.mock_plugins import mock_plugin_manager
 
 
@@ -39,6 +39,18 @@ class TestHookLineageCollector:
     def collector(self):
         return HookLineageCollector()
 
+    def test_default_max_collected_assets(self):
+        """Default MAX_COLLECTED_ASSETS is 100 (configurable via [lineage] 
conf)."""
+        from airflow.sdk.lineage import MAX_COLLECTED_ASSETS
+
+        assert MAX_COLLECTED_ASSETS == 100
+
+    def test_default_max_collected_extra(self):
+        """Default MAX_COLLECTED_EXTRA is 200 (configurable via [lineage] 
conf)."""
+        from airflow.sdk.lineage import MAX_COLLECTED_EXTRA
+
+        assert MAX_COLLECTED_EXTRA == 200
+
     def test_generate_hash_handles_non_serializable(self, collector):
         class Obj:
             def __str__(self):
@@ -377,6 +389,27 @@ class TestHookLineageCollector:
         assert len(collector._inputs) == 100
         assert len(collector._outputs) == 100
 
+    def test_configurable_max_collected_assets(self):
+        """MAX_COLLECTED_ASSETS is read from [lineage] conf and limits both 
inputs and outputs."""
+        import importlib
+
+        import airflow.sdk.lineage as lineage_mod
+
+        new_max = 4
+
+        with conf_vars({("lineage", "max_assets_per_collector"): 
str(new_max)}):
+            importlib.reload(lineage_mod)
+            assert new_max == lineage_mod.MAX_COLLECTED_ASSETS
+
+            collector = lineage_mod.HookLineageCollector()
+            for i in range(new_max * 2):
+                collector.add_input_asset(MagicMock(spec=BaseHook), 
uri=f"test://input/{i}")
+                collector.add_output_asset(MagicMock(spec=BaseHook), 
uri=f"test://output/{i}")
+            assert len(collector._inputs) == new_max
+            assert len(collector._outputs) == new_max
+
+        importlib.reload(lineage_mod)
+
     @pytest.mark.parametrize("uri", ["", None])
     def test_invalid_uri_none(self, collector, uri):
         """Test handling of None or empty URI - should not raise."""
@@ -565,6 +598,26 @@ class TestHookLineageCollector:
 
         assert len(collector.collected_assets.extra) == 200
 
+    def test_configurable_max_collected_extra(self):
+        """MAX_COLLECTED_EXTRA is read from [lineage] conf and limits extra 
collection."""
+        import importlib
+
+        import airflow.sdk.lineage as lineage_mod
+
+        new_max = 4
+
+        with conf_vars({("lineage", "max_extras_per_collector"): 
str(new_max)}):
+            importlib.reload(lineage_mod)
+            assert new_max == lineage_mod.MAX_COLLECTED_EXTRA
+
+            collector = lineage_mod.HookLineageCollector()
+            ctx = MagicMock(spec=BaseHook)
+            for i in range(new_max * 2):
+                collector.add_extra(ctx, f"key_{i}", f"value_{i}")
+            assert len(collector.collected_assets.extra) == new_max
+
+        importlib.reload(lineage_mod)
+
     def test_add_extra_different_values(self, collector):
         """Test that different values are tracked separately."""
         ctx = MagicMock(spec=BaseHook)
@@ -862,16 +915,23 @@ class FakePlugin(plugins_manager.AirflowPlugin):
 
 
 @pytest.mark.parametrize(
-    ("has_readers", "expected_class"),
+    ("has_readers", "expected_class_name"),
     [
-        (True, HookLineageCollector),
-        (False, NoOpCollector),
+        (True, "HookLineageCollector"),
+        (False, "NoOpCollector"),
     ],
 )
-def test_get_hook_lineage_collector(has_readers, expected_class):
-    # reset cached instance
-    get_hook_lineage_collector.cache_clear()
+def test_get_hook_lineage_collector(has_readers, expected_class_name):
+    # After importlib.reload in other tests, the top-level imported references
+    # (get_hook_lineage_collector, HookLineageCollector, NoOpCollector) may 
point
+    # to stale class/function objects. Always fetch from the live module so 
that
+    # the function, the returned instance, and the class used in isinstance() 
all
+    # belong to the same reload generation.
+    import airflow.sdk.lineage as lineage_mod
+
+    lineage_mod.get_hook_lineage_collector.cache_clear()
+    expected_class = getattr(lineage_mod, expected_class_name)
     plugins = [FakePlugin()] if has_readers else []
     with mock_plugin_manager(plugins=plugins):
-        assert isinstance(get_hook_lineage_collector(), expected_class)
-        assert get_hook_lineage_collector() is get_hook_lineage_collector()
+        assert isinstance(lineage_mod.get_hook_lineage_collector(), 
expected_class)
+        assert lineage_mod.get_hook_lineage_collector() is 
lineage_mod.get_hook_lineage_collector()

Reply via email to