This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e84abc1b26b Remove to_upstream contract from PartitionMapper (#61706)
e84abc1b26b is described below
commit e84abc1b26bebad2aa9b7c221ffe76555dfa3c9a
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Feb 10 16:12:44 2026 +0800
Remove to_upstream contract from PartitionMapper (#61706)
---
airflow-core/src/airflow/partition_mapper/base.py | 10 ++--------
airflow-core/src/airflow/partition_mapper/identity.py | 9 +--------
airflow-core/tests/unit/jobs/test_scheduler_job.py | 5 +----
3 files changed, 4 insertions(+), 20 deletions(-)
diff --git a/airflow-core/src/airflow/partition_mapper/base.py
b/airflow-core/src/airflow/partition_mapper/base.py
index e7a5b77d8e4..92e87c0bd28 100644
--- a/airflow-core/src/airflow/partition_mapper/base.py
+++ b/airflow-core/src/airflow/partition_mapper/base.py
@@ -14,13 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any
-
-if TYPE_CHECKING:
- from collections.abc import Iterable
+from typing import Any
class PartitionMapper(ABC):
@@ -34,10 +32,6 @@ class PartitionMapper(ABC):
def to_downstream(self, key: str) -> str:
"""Return the target key that the given source partition key maps
to."""
- @abstractmethod
- def to_upstream(self, key: str) -> Iterable[str]:
- """Yield the source keys that map to the given target partition key."""
-
def serialize(self) -> dict[str, Any]:
return {}
diff --git a/airflow-core/src/airflow/partition_mapper/identity.py
b/airflow-core/src/airflow/partition_mapper/identity.py
index 6be52c56367..4539600da98 100644
--- a/airflow-core/src/airflow/partition_mapper/identity.py
+++ b/airflow-core/src/airflow/partition_mapper/identity.py
@@ -14,21 +14,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-from typing import TYPE_CHECKING
+from __future__ import annotations
from airflow.partition_mapper.base import PartitionMapper
-if TYPE_CHECKING:
- from collections.abc import Iterable
-
class IdentityMapper(PartitionMapper):
"""Partition mapper that does not change the key."""
def to_downstream(self, key: str) -> str:
return key
-
- def to_upstream(self, key: str) -> Iterable[str]:
- yield key
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0130a04352f..e1db0b698cc 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -22,7 +22,7 @@ import datetime
import logging
import os
from collections import Counter, deque
-from collections.abc import Callable, Generator, Iterable, Iterator
+from collections.abc import Callable, Generator, Iterator
from contextlib import ExitStack
from datetime import timedelta
from pathlib import Path
@@ -8640,9 +8640,6 @@ class Key1Mapper(CorePartitionMapper):
def to_downstream(self, key: str) -> str:
return "key-1"
- def to_upstream(self, key: str) -> Iterable[str]:
- yield key
-
def _find_registered_custom_partition_mapper(import_string: str) ->
type[CorePartitionMapper]:
if import_string == qualname(Key1Mapper):