This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d445240be7d Fix `RecursionError` in `common.compat` hook lineage
`add_extra polyfill` (#68735)
d445240be7d is described below
commit d445240be7d332933e342f6cc472b39a85795ed3
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Jun 19 12:05:46 2026 +0100
Fix `RecursionError` in `common.compat` hook lineage `add_extra polyfill`
(#68735)
_add_extra_polyfill re-patched the collector's
collected_assets/has_collected class properties on every call (the trigger gate
keys on instance-level _extra), stacking a wrapper layer per fresh collector
until property access raised RecursionError on the Compat 3.1.x provider test
matrix. Patch each class exactly once via a per-class marker set after all
patches; production is unaffected (singleton collector).
---
.../providers/common/compat/lineage/hook.py | 33 +++++--
.../tests/unit/common/compat/lineage/test_hook.py | 108 ++++++++++++++++++++-
2 files changed, 132 insertions(+), 9 deletions(-)
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 37b352b2cc5..d1b036998b3 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -64,6 +64,14 @@ def _add_extra_polyfill(collector):
from airflow.providers.common.compat.sdk import HookLineage as
_BaseHookLineage
+ # Ensure this instance has the extra-tracking attributes. The trigger gate
+ # (`_lacks_add_extra_method`) and the wrappers below key on `_extra`. Only
set them when
+ # missing so re-applying the polyfill never clears a collector that
already has extras.
+ if not hasattr(collector, "_extra"):
+ collector._extra = {}
+ if not hasattr(collector, "_extra_counts"):
+ collector._extra_counts = defaultdict(int)
+
# Add `extra` to HookLineage returned by `collected_assets` property
@attr.define
class ExtraLineageInfo:
@@ -86,12 +94,19 @@ def _add_extra_polyfill(collector):
# run on AF3.2, where this other one is used, so this is fine - we can
ignore.
extra: list[ExtraLineageInfo] = attr.field(factory=list) # type:
ignore[assignment]
- # Initialize extra tracking attributes on this collector instance
- collector._extra = {}
- collector._extra_counts = defaultdict(int)
-
- # Overwrite the `collected_assets` property on a class
- _original_collected_assets = collector.__class__.collected_assets
+ # Overwrite the `collected_assets` property on a class.
+ #
+ # Capture the *true* original getter once per class and always wrap that,
never the wrapper a
+ # previous call installed. This polyfill is re-applied for every fresh
collector instance (the
+ # trigger gate keys on instance-level `_extra`), and on Airflow 2 the
asset-naming layer
+ # re-patches `collected_assets` before each call. Wrapping the *current*
value would capture the
+ # prior compat wrapper as the "original" and grow the getter chain one
level per call until
+ # access raises RecursionError. The stash lives in the class's own
`__dict__` so a subclass that
+ # overrides the property is captured on its own rather than reusing a base
class's original.
+ cls = type(collector)
+ if "_compat_original_collected_assets" not in cls.__dict__:
+ cls._compat_original_collected_assets = cls.collected_assets
+ _original_collected_assets =
cls.__dict__["_compat_original_collected_assets"]
def _compat_collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
@@ -120,8 +135,10 @@ def _add_extra_polyfill(collector):
type(collector).collected_assets = property(_compat_collected_assets)
- # Overwrite the `has_collected` property on a class
- _original_has_collected = collector.__class__.has_collected
+ # Overwrite the `has_collected` property on a class (same
capture-original-once rule).
+ if "_compat_original_has_collected" not in cls.__dict__:
+ cls._compat_original_has_collected = cls.has_collected
+ _original_has_collected = cls.__dict__["_compat_original_has_collected"]
def _compat_has_collected(self) -> bool:
# Defensive check since we patch the class property, but initialized
_extra only on this instance.
diff --git
a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
index fa4612ef8ad..23b93289dfb 100644
--- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
+++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
@@ -16,11 +16,17 @@
# under the License.
from __future__ import annotations
+import sys
+from collections import namedtuple
from unittest import mock
import pytest
-from airflow.providers.common.compat.lineage.hook import
_lacks_add_extra_method, _lacks_asset_methods
+from airflow.providers.common.compat.lineage.hook import (
+ _add_extra_polyfill,
+ _lacks_add_extra_method,
+ _lacks_asset_methods,
+)
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -1023,3 +1029,103 @@ class TestEdgeCases:
assert len(lineage.inputs) == 0
assert len(lineage.outputs) == 0
+
+
+class TestAddExtraPolyfillIdempotency:
+ """Regression tests for the ``add_extra`` polyfill (RecursionError on the
Compat provider matrix).
+
+ ``_add_extra_polyfill`` is re-applied for every fresh collector instance
(the trigger gate keys
+ on instance-level ``_extra``), and on Airflow 2 the asset-naming layer
re-patches
+ ``collected_assets`` before each call. The polyfill must capture the
*true* original getter once
+ and always wrap that; otherwise it wraps the previously installed wrapper
and the getter chain
+ grows one level per call until property access raises ``RecursionError``.
+ """
+
+ @staticmethod
+ def _make_collector_class():
+ """A minimal collector lacking ``add_extra`` (simulates Airflow <
3.2)."""
+ fake_lineage = namedtuple("_FakeLineage", ["inputs", "outputs"])
+
+ class FakeCollector:
+ @property
+ def collected_assets(self):
+ return fake_lineage(inputs=[], outputs=[])
+
+ @property
+ def has_collected(self):
+ return False
+
+ return FakeCollector
+
+ def test_true_original_getter_captured_once(self):
+ fake_collector_cls = self._make_collector_class()
+ real_getter = fake_collector_cls.collected_assets.fget
+
+ _add_extra_polyfill(fake_collector_cls())
+ stored =
fake_collector_cls.__dict__["_compat_original_collected_assets"].fget
+ assert stored is real_getter
+ assert fake_collector_cls.collected_assets.fget is not real_getter #
now a wrapper
+
+ # Re-applying for a fresh instance keeps the original; it never
captures the wrapper.
+ _add_extra_polyfill(fake_collector_cls())
+ assert
fake_collector_cls.__dict__["_compat_original_collected_assets"].fget is
real_getter
+
+ def test_repeated_polyfill_does_not_recurse(self):
+ fake_collector_cls = self._make_collector_class()
+
+ # Many fresh collectors of the same class (as across a test session).
Pre-fix this stacked
+ # one wrapper per call and blew the recursion limit on the next
property access.
+ for _ in range(sys.getrecursionlimit() + 100):
+ _add_extra_polyfill(fake_collector_cls())
+
+ collector = _add_extra_polyfill(fake_collector_cls())
+ collector.add_extra(mock.MagicMock(), "k", "v")
+
+ lineage = collector.collected_assets # must not raise RecursionError
+ assert [extra.key for extra in lineage.extra] == ["k"]
+ assert collector.has_collected is True
+
+ def test_extra_survives_external_repatch_of_collected_assets(self):
+ """Mirrors Airflow 2: the asset-naming layer re-patches
``collected_assets`` (to a plain,
+ no-``extra`` wrapper) before each polyfill call. The extra wrapper
must be re-applied each
+ time so ``collected_assets.extra`` stays present, without the getter
chain growing unbounded.
+ """
+ fake_collector_cls = self._make_collector_class()
+ real_getter = fake_collector_cls.collected_assets.fget
+
+ for _ in range(sys.getrecursionlimit() + 100):
+ # External (asset-naming) layer resets collected_assets to a fresh
plain wrapper.
+ fake_collector_cls.collected_assets = property(lambda self,
_g=real_getter: _g(self))
+ _add_extra_polyfill(fake_collector_cls())
+
+ collector = fake_collector_cls()
+ fake_collector_cls.collected_assets = property(lambda self,
_g=real_getter: _g(self))
+ collector = _add_extra_polyfill(collector)
+ collector.add_extra(mock.MagicMock(), "k", "v")
+
+ lineage = collector.collected_assets
+ assert hasattr(lineage, "extra")
+ assert [extra.key for extra in lineage.extra] == ["k"]
+
+ def test_subclass_with_own_property_is_captured_independently(self):
+ """Idempotency stash is keyed on the class's own ``__dict__``, not
inherited attributes."""
+ base_collector_cls = self._make_collector_class()
+ _add_extra_polyfill(base_collector_cls())
+
+ fake_lineage = namedtuple("_FakeLineage", ["inputs", "outputs"])
+
+ class SubCollector(base_collector_cls):
+ @property
+ def collected_assets(self):
+ return fake_lineage(inputs=[], outputs=[])
+
+ @property
+ def has_collected(self):
+ return False
+
+ sub_getter = SubCollector.collected_assets.fget
+ collector = _add_extra_polyfill(SubCollector())
+ assert SubCollector.__dict__["_compat_original_collected_assets"].fget
is sub_getter
+
+ collector.add_extra(mock.MagicMock(), "k", "v")
+ assert [extra.key for extra in collector.collected_assets.extra] ==
["k"]