This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f219bfbe22 Fix xcom arg.py .zip bug (#26636)
f219bfbe22 is described below

commit f219bfbe22e662a8747af19d688bbe843e1a953d
Author: Robert J. McGinness <[email protected]>
AuthorDate: Mon Sep 26 05:02:55 2022 -0400

    Fix xcom arg.py .zip bug (#26636)
---
 airflow/models/xcom_arg.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 9be82976ae..b70f26cd4f 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -31,7 +31,7 @@ from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.utils.context import Context
 from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
@@ -322,7 +322,7 @@ class PlainXComArg(XComArg):
     def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
         task_id = self.operator.task_id
         result = context["ti"].xcom_pull(task_ids=task_id, key=str(self.key), 
default=NOTSET, session=session)
-        if result is not NOTSET:
+        if not isinstance(result, ArgNotSet):
             return result
         if self.key == XCOM_RETURN_KEY:
             return None
@@ -437,7 +437,7 @@ class _ZipResult(Sequence):
 
     def __len__(self) -> int:
         lengths = (len(v) for v in self.values)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(lengths)
         return max(lengths)
 
@@ -460,13 +460,13 @@ class ZipXComArg(XComArg):
         args_iter = iter(self.args)
         first = repr(next(args_iter))
         rest = ", ".join(repr(arg) for arg in args_iter)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return f"{first}.zip({rest})"
         return f"{first}.zip({rest}, fillvalue={self.fillvalue!r})"
 
     def _serialize(self) -> dict[str, Any]:
         args = [serialize_xcom_arg(arg) for arg in self.args]
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return {"args": args}
         return {"args": args, "fillvalue": self.fillvalue}
 
@@ -486,7 +486,7 @@ class ZipXComArg(XComArg):
         ready_lengths = [length for length in all_lengths if length is not 
None]
         if len(ready_lengths) != len(self.args):
             return None  # If any of the referenced XComs is not ready, we are 
not ready either.
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(ready_lengths)
         return max(ready_lengths)
 

Reply via email to