BasPH commented on a change in pull request #20976:
URL: https://github.com/apache/airflow/pull/20976#discussion_r790592809



##########
File path: tests/models/test_taskinstance.py
##########
@@ -2285,13 +2286,32 @@ def push_something():
             push_something()
 
         ti = dag_maker.create_dagrun().task_instances[0]
-        with pytest.raises(UnmappableXComPushed) as ctx:
+        with pytest.raises(UnmappableXComTypePushed) as ctx:
             self._run_ti_with_faked_mapped_dependants(ti)
 
         assert dag_maker.session.query(TaskMap).count() == 0
         assert ti.state == TaskInstanceState.FAILED
         assert str(ctx.value) == "unmappable return type 'str'"
 
+    @conf_vars({("core", "max_map_length"): "1"})
+    def test_error_if_unmappable_length(self, dag_maker):
+        """If an unmappable return value is used to map, fail the task that 
pushed the XCom."""
+        with dag_maker(dag_id="test_not_recorded_for_unused") as dag:
+
+            @dag.task()
+            def push_something():
+                return [1, 2]
+
+            push_something()
+
+        ti = dag_maker.create_dagrun().task_instances[0]
+        with pytest.raises(UnmappableXComLengthPushed) as ctx:
+            self._run_ti_with_faked_mapped_dependants(ti)
+
+        assert dag_maker.session.query(TaskMap).count() == 0
+        assert ti.state == TaskInstanceState.FAILED
+        assert str(ctx.value) == "unmappable return value size: 2 > 1"

Review comment:
       ```suggestion
           assert str(ctx.value) == "unmappable return value length: 2 > 1"
   ```

##########
File path: airflow/exceptions.py
##########
@@ -110,6 +110,18 @@ def __str__(self) -> str:
         return f"unmappable return type {type(self.value).__qualname__!r}"
 
 
+class UnmappableXComLengthPushed(AirflowException):
+    """Raise when the pushed value is to large to map as a downstream's 
dependency."""
+
+    def __init__(self, value: Sized, max_length: int) -> None:
+        super().__init__(value)
+        self.value = value
+        self.max_length = max_length
+
+    def __str__(self) -> str:
+        return f"unmappable return value size: {len(self.value)} > 
{self.max_length}"

Review comment:
       Would rename size to length for clarity
   
   ```suggestion
           return f"Unmappable return value length: {len(self.value)} > 
{self.max_length}"
   ```

##########
File path: airflow/exceptions.py
##########
@@ -110,6 +110,18 @@ def __str__(self) -> str:
         return f"unmappable return type {type(self.value).__qualname__!r}"
 
 
+class UnmappableXComLengthPushed(AirflowException):
+    """Raise when the pushed value is to large to map as a downstream's 
dependency."""

Review comment:
       ```suggestion
       """Raise when the pushed value is too large to map as a downstream's 
dependency."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to