ashb commented on code in PR #46032:
URL: https://github.com/apache/airflow/pull/46032#discussion_r1930685748


##########
task_sdk/src/airflow/sdk/definitions/_internal/expandinput.py:
##########
@@ -0,0 +1,271 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import functools
+import operator
+from collections.abc import Iterable, Mapping, Sequence, Sized
+from typing import TYPE_CHECKING, Any, ClassVar, Union
+
+import attrs
+
+from airflow.sdk.definitions._internal.mixins import ResolveMixin
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.xcom_arg import XComArg
+    from airflow.sdk.types import Operator
+    from airflow.typing_compat import TypeGuard
+
+ExpandInput = Union["DictOfListsExpandInput", "ListOfDictsExpandInput"]
+
+# Each keyword argument to expand() can be an XComArg, sequence, or dict (not
+# any mapping since we need the value to be ordered).
+OperatorExpandArgument = Union["MappedArgument", "XComArg", Sequence, 
dict[str, Any]]
+
+# The single argument of expand_kwargs() can be an XComArg, or a list with each
+# element being either an XComArg or a dict.
+OperatorExpandKwargsArgument = Union["XComArg", Sequence[Union["XComArg", 
Mapping[str, Any]]]]
+
+
+class NotFullyPopulated(RuntimeError):
+    """
+    Raise when ``get_map_lengths`` cannot populate all mapping metadata.
+
+    This is generally due to not all upstream tasks have finished when the
+    function is called.
+    """
+
+    def __init__(self, missing: set[str]) -> None:
+        self.missing = missing
+
+    def __str__(self) -> str:
+        keys = ", ".join(repr(k) for k in sorted(self.missing))
+        return f"Failed to populate all mapping metadata; missing: {keys}"
+
+
+# To replace tedious isinstance() checks.
+def is_mappable(v: Any) -> TypeGuard[OperatorExpandArgument]:
+    from airflow.sdk.definitions.xcom_arg import XComArg
+
+    return isinstance(v, (MappedArgument, XComArg, Mapping, Sequence)) and not 
isinstance(v, str)
+
+
+# To replace tedious isinstance() checks.
+def _is_parse_time_mappable(v: OperatorExpandArgument) -> TypeGuard[Mapping | 
Sequence]:
+    from airflow.models.xcom_arg import XComArg
+
+    return not isinstance(v, (MappedArgument, XComArg))
+
+
+# To replace tedious isinstance() checks.
+def _needs_run_time_resolution(v: OperatorExpandArgument) -> 
TypeGuard[MappedArgument | XComArg]:
+    from airflow.models.xcom_arg import XComArg
+
+    return isinstance(v, (MappedArgument, XComArg))
+
+
[email protected](kw_only=True)
+class MappedArgument(ResolveMixin):
+    """
+    Stand-in stub for task-group-mapping arguments.
+
+    This is very similar to an XComArg, but resolved differently. Declared here
+    (instead of in the task group module) to avoid import cycles.
+    """
+
+    _input: ExpandInput
+    _key: str
+
+    def iter_references(self) -> Iterable[tuple[Operator, str]]:
+        yield from self._input.iter_references()
+
+    def resolve(self, context: Mapping[str, Any], *, include_xcom: bool = 
True) -> Any:
+        data, _ = self._input.resolve(context, include_xcom=include_xcom)
+        return data[self._key]
+
+
[email protected]()
+class DictOfListsExpandInput(ResolveMixin):

Review Comment:
   Yeah, this was mostly "just move the code around" so 🤷🏻  I can't think of a 
better/different name so unless someone has a better idea I'll leave this as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to