kaxil commented on code in PR #62922:
URL: https://github.com/apache/airflow/pull/62922#discussion_r2963398227


##########
task-sdk/src/airflow/sdk/definitions/iterableoperator.py:
##########
@@ -0,0 +1,354 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import copy
+import os
+from collections import deque
+from collections.abc import Iterable, Sequence
+from concurrent.futures import Future
+from itertools import chain
+from typing import TYPE_CHECKING, Any
+
+try:
+    # Python 3.12+
+    from itertools import batched  # type: ignore[attr-defined]
+except ImportError:
+    from more_itertools import batched  # type: ignore[no-redef]

Review Comment:
   Still unaddressed from previous review: `more-itertools` is not declared as 
a dependency of `task-sdk`. The `try/except ImportError` fallback handles 
`batched` for Python < 3.12, but if `more-itertools` isn't installed, this will 
raise `ImportError` at import time on 3.10/3.11.
   
   Either add `more-itertools` to `task-sdk/pyproject.toml` dependencies (with 
a `python_version < "3.12"` marker), or vendor a simple `batched` 
implementation:
   ```python
   def batched(iterable, n):
       it = iter(iterable)
       while batch := tuple(islice(it, n)):
           yield batch
   ```



##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1731,6 +1747,50 @@ def execute(self, context):
             return loop.run_until_complete(self.aexecute(context))
 
 
+class DecoratedDeferredAsyncOperator(BaseAsyncOperator):
+    """
+    A decorator operator that wraps another deferred BaseOperator instance.
+
+    Implements the async aexecute() method while delegating all other behavior.
+    """
+
+    def __init__(self, *, operator: BaseOperator, task_deferred: TaskDeferred, 
**kwargs: Any):
+        super().__init__(task_id=operator.task_id, **kwargs)
+        self._operator = operator
+        self._task_deferred = task_deferred
+
+    async def aexecute(self, context):
+        from airflow.sdk.execution_time.callback_runner import 
create_executable_runner
+        from airflow.sdk.execution_time.context import 
context_get_outlet_events
+
+        event = await run_trigger(self._task_deferred.trigger)
+
+        self.log.debug("event: %s", event)
+
+        if event:
+            self.log.debug("next_method: %s", self._task_deferred.method_name)
+
+            if self._task_deferred.method_name:
+                try:
+                    next_method = self._operator.next_callable(
+                        self._task_deferred.method_name,
+                        self._task_deferred.kwargs,
+                    )
+
+                    outlet_events = context_get_outlet_events(context)
+                    runner = create_executable_runner(
+                        func=next_method,
+                        outlet_events=outlet_events,
+                        logger=self.log,
+                    )
+                    return runner.run(context, event.payload)
+                except TaskDeferred as task_deferred:
+                    self._task_deferred = task_deferred
+                    # Recursively handle nested deferrals
+                    return await self.aexecute(context=context)

Review Comment:
   Still unaddressed from previous review: this is unbounded recursion. If the 
callback keeps raising `TaskDeferred`, each recursive `await 
self.aexecute(context=context)` adds a frame. Python's default recursion limit 
(1000) will eventually blow up with `RecursionError`.
   
   Convert to a loop:
   ```python
   while True:
       try:
           ...
           return runner.run(context, event.payload)
       except TaskDeferred as task_deferred:
           self._task_deferred = task_deferred
           continue
   ```



##########
task-sdk/src/airflow/sdk/definitions/mappedoperator.py:
##########
@@ -336,19 +376,20 @@ def __repr__(self):
         return f"<Mapped({self.task_type}): {self.task_id}>"
 
     def __attrs_post_init__(self):
-        from airflow.sdk.definitions.xcom_arg import XComArg
-
-        if self.get_closest_mapped_task_group() is not None:
-            raise NotImplementedError("operator expansion in an expanded task 
group is not yet supported")
-
-        if self.task_group:
-            self.task_group.add(self)
-        if self.dag:
-            self.dag.add_task(self)
-        XComArg.apply_upstream_relationship(self, 
self._get_specified_expand_input().value)
-        for k, v in self.partial_kwargs.items():
-            if k in self.template_fields:
-                XComArg.apply_upstream_relationship(self, v)
+        if self._apply_upstream_relationship:

Review Comment:
   Still unaddressed from previous review: `_apply_upstream_relationship=False` 
skips the entire `__attrs_post_init__` body — including `task_group.add(self)` 
and `dag.add_task(self)`. This means `IterableOperator._expand` creates 
`MappedOperator` instances that are never registered with their DAG or task 
group.
   
   If the intent is only to skip `XComArg.apply_upstream_relationship`, the 
guard should be narrower — wrap only lines 389-393, not the whole method body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to