This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 97e5881fde22c695fd59e2d57df095a03c4ccdf3
Author: Tatiana Al-Chueyr <[email protected]>
AuthorDate: Fri May 5 19:32:42 2023 +0100

    Add support for dynamic tasks with template fields that contain 
`pandas.DataFrame` (#30943)
    
    Closes: https://github.com/astronomer/astro-sdk/issues/1911
    (cherry picked from commit e126530667fcac32d324d28491bee792cb71b1d8)
---
 airflow/models/abstractoperator.py  | 19 +++++++++++++++++--
 tests/models/test_mappedoperator.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index f0ec59eb2a..c8584a9f4a 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -564,8 +564,23 @@ class AbstractOperator(Templater, DAGNode):
                     f"{attr_name!r} is configured as a template field "
                     f"but {parent.task_type} does not have this attribute."
                 )
-            if not value:
-                continue
+
+            try:
+                if not value:
+                    continue
+            except Exception:
+                # This may happen if the templated field points to a class 
which does not support `__bool__`,
+                # such as Pandas DataFrames:
+                # 
https://github.com/pandas-dev/pandas/blob/9135c3aaf12d26f857fcc787a5b64d521c51e379/pandas/core/generic.py#L1465
+                self.log.info(
+                    "Unable to check if the value of type '%s' is False for 
task '%s', field '%s'.",
+                    type(value).__name__,
+                    self.task_id,
+                    attr_name,
+                )
+                # We may still want to render custom classes which do not 
support __bool__
+                pass
+
             try:
                 rendered_content = self.render_template(
                     value,
diff --git a/tests/models/test_mappedoperator.py 
b/tests/models/test_mappedoperator.py
index cfd77b55ef..d6aef85428 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -17,7 +17,9 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 from datetime import timedelta
+from unittest.mock import patch
 
 import pendulum
 import pytest
@@ -29,6 +31,7 @@ from airflow.models.param import ParamsDict
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskmap import TaskMap
 from airflow.models.xcom_arg import XComArg
+from airflow.utils.context import Context
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.trigger_rule import TriggerRule
@@ -57,6 +60,38 @@ def test_task_mapping_with_dag():
     assert mapped.downstream_list == [finish]
 
 
+@patch("airflow.models.abstractoperator.AbstractOperator.render_template")
+def 
test_task_mapping_with_dag_and_list_of_pandas_dataframe(mock_render_template, 
caplog):
+    caplog.set_level(logging.INFO)
+
+    class UnrenderableClass:
+        def __bool__(self):
+            raise ValueError("Similar to Pandas DataFrames, this class raises 
an exception.")
+
+    class CustomOperator(BaseOperator):
+        template_fields = ("arg",)
+
+        def __init__(self, arg, **kwargs):
+            super().__init__(**kwargs)
+            self.arg = arg
+
+        def execute(self, context: Context):
+            pass
+
+    with DAG("test-dag", start_date=DEFAULT_DATE) as dag:
+        task1 = CustomOperator(task_id="op1", arg=None)
+        unrenderable_values = [UnrenderableClass(), UnrenderableClass()]
+        mapped = 
CustomOperator.partial(task_id="task_2").expand(arg=unrenderable_values)
+        task1 >> mapped
+    dag.test()
+    assert caplog.text.count("task_2 ran successfully") == 2
+    assert (
+        "Unable to check if the value of type 'UnrenderableClass' is False for 
task 'task_2', field 'arg'"
+        in caplog.text
+    )
+    mock_render_template.assert_called()
+
+
 def test_task_mapping_without_dag_context():
     with DAG("test-dag", start_date=DEFAULT_DATE) as dag:
         task1 = BaseOperator(task_id="op1")

Reply via email to