This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e84abc1b26b Remove to_upstream contract from PartitionMapper (#61706)
e84abc1b26b is described below

commit e84abc1b26bebad2aa9b7c221ffe76555dfa3c9a
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Feb 10 16:12:44 2026 +0800

    Remove to_upstream contract from PartitionMapper (#61706)
---
 airflow-core/src/airflow/partition_mapper/base.py     | 10 ++--------
 airflow-core/src/airflow/partition_mapper/identity.py |  9 +--------
 airflow-core/tests/unit/jobs/test_scheduler_job.py    |  5 +----
 3 files changed, 4 insertions(+), 20 deletions(-)

diff --git a/airflow-core/src/airflow/partition_mapper/base.py 
b/airflow-core/src/airflow/partition_mapper/base.py
index e7a5b77d8e4..92e87c0bd28 100644
--- a/airflow-core/src/airflow/partition_mapper/base.py
+++ b/airflow-core/src/airflow/partition_mapper/base.py
@@ -14,13 +14,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
 from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any
-
-if TYPE_CHECKING:
-    from collections.abc import Iterable
+from typing import Any
 
 
 class PartitionMapper(ABC):
@@ -34,10 +32,6 @@ class PartitionMapper(ABC):
     def to_downstream(self, key: str) -> str:
         """Return the target key that the given source partition key maps 
to."""
 
-    @abstractmethod
-    def to_upstream(self, key: str) -> Iterable[str]:
-        """Yield the source keys that map to the given target partition key."""
-
     def serialize(self) -> dict[str, Any]:
         return {}
 
diff --git a/airflow-core/src/airflow/partition_mapper/identity.py 
b/airflow-core/src/airflow/partition_mapper/identity.py
index 6be52c56367..4539600da98 100644
--- a/airflow-core/src/airflow/partition_mapper/identity.py
+++ b/airflow-core/src/airflow/partition_mapper/identity.py
@@ -14,21 +14,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
 
-from typing import TYPE_CHECKING
+from __future__ import annotations
 
 from airflow.partition_mapper.base import PartitionMapper
 
-if TYPE_CHECKING:
-    from collections.abc import Iterable
-
 
 class IdentityMapper(PartitionMapper):
     """Partition mapper that does not change the key."""
 
     def to_downstream(self, key: str) -> str:
         return key
-
-    def to_upstream(self, key: str) -> Iterable[str]:
-        yield key
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0130a04352f..e1db0b698cc 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -22,7 +22,7 @@ import datetime
 import logging
 import os
 from collections import Counter, deque
-from collections.abc import Callable, Generator, Iterable, Iterator
+from collections.abc import Callable, Generator, Iterator
 from contextlib import ExitStack
 from datetime import timedelta
 from pathlib import Path
@@ -8640,9 +8640,6 @@ class Key1Mapper(CorePartitionMapper):
     def to_downstream(self, key: str) -> str:
         return "key-1"
 
-    def to_upstream(self, key: str) -> Iterable[str]:
-        yield key
-
 
 def _find_registered_custom_partition_mapper(import_string: str) -> 
type[CorePartitionMapper]:
     if import_string == qualname(Key1Mapper):

Reply via email to