This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 97e5881fde22c695fd59e2d57df095a03c4ccdf3 Author: Tatiana Al-Chueyr <[email protected]> AuthorDate: Fri May 5 19:32:42 2023 +0100 Add support for dynamic tasks with template fields that contain `pandas.DataFrame` (#30943) Closes: https://github.com/astronomer/astro-sdk/issues/1911 (cherry picked from commit e126530667fcac32d324d28491bee792cb71b1d8) --- airflow/models/abstractoperator.py | 19 +++++++++++++++++-- tests/models/test_mappedoperator.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index f0ec59eb2a..c8584a9f4a 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -564,8 +564,23 @@ class AbstractOperator(Templater, DAGNode): f"{attr_name!r} is configured as a template field " f"but {parent.task_type} does not have this attribute." ) - if not value: - continue + + try: + if not value: + continue + except Exception: + # This may happen if the templated field points to a class which does not support `__bool__`, + # such as Pandas DataFrames: + # https://github.com/pandas-dev/pandas/blob/9135c3aaf12d26f857fcc787a5b64d521c51e379/pandas/core/generic.py#L1465 + self.log.info( + "Unable to check if the value of type '%s' is False for task '%s', field '%s'.", + type(value).__name__, + self.task_id, + attr_name, + ) + # We may still want to render custom classes which do not support __bool__ + pass + try: rendered_content = self.render_template( value, diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py index cfd77b55ef..d6aef85428 100644 --- a/tests/models/test_mappedoperator.py +++ b/tests/models/test_mappedoperator.py @@ -17,7 +17,9 @@ # under the License. from __future__ import annotations +import logging from datetime import timedelta +from unittest.mock import patch import pendulum import pytest @@ -29,6 +31,7 @@ from airflow.models.param import ParamsDict from airflow.models.taskinstance import TaskInstance from airflow.models.taskmap import TaskMap from airflow.models.xcom_arg import XComArg +from airflow.utils.context import Context from airflow.utils.state import TaskInstanceState from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule @@ -57,6 +60,38 @@ def test_task_mapping_with_dag(): assert mapped.downstream_list == [finish] +@patch("airflow.models.abstractoperator.AbstractOperator.render_template") +def test_task_mapping_with_dag_and_list_of_pandas_dataframe(mock_render_template, caplog): + caplog.set_level(logging.INFO) + + class UnrenderableClass: + def __bool__(self): + raise ValueError("Similar to Pandas DataFrames, this class raises an exception.") + + class CustomOperator(BaseOperator): + template_fields = ("arg",) + + def __init__(self, arg, **kwargs): + super().__init__(**kwargs) + self.arg = arg + + def execute(self, context: Context): + pass + + with DAG("test-dag", start_date=DEFAULT_DATE) as dag: + task1 = CustomOperator(task_id="op1", arg=None) + unrenderable_values = [UnrenderableClass(), UnrenderableClass()] + mapped = CustomOperator.partial(task_id="task_2").expand(arg=unrenderable_values) + task1 >> mapped + dag.test() + assert caplog.text.count("task_2 ran successfully") == 2 + assert ( + "Unable to check if the value of type 'UnrenderableClass' is False for task 'task_2', field 'arg'" + in caplog.text + ) + mock_render_template.assert_called() + + def test_task_mapping_without_dag_context(): with DAG("test-dag", start_date=DEFAULT_DATE) as dag: task1 = BaseOperator(task_id="op1")
