This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7463380047347e8fe98202d3b272a1b60e8db5b9 Author: Ephraim Anierobi <[email protected]> AuthorDate: Tue Aug 8 15:50:22 2023 +0100 Fix xcom view returning bytes as xcom value (#33202) (cherry picked from commit 36c2735ca48d4c2e2d239c210d3732fd8918fed2) --- airflow/www/views.py | 6 +++--- tests/www/views/test_views_tasks.py | 23 +++++++++++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 18e062a487..3faedd3d8b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1926,15 +1926,15 @@ class Airflow(AirflowBaseView): flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error") return redirect(url_for("Airflow.index")) - xcom_query = session.execute( - select(XCom.key, XCom.value).where( + xcom_query = session.scalars( + select(XCom).where( XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.execution_date == dttm, XCom.map_index == map_index, ) ) - attributes = [(k, v) for k, v in xcom_query if not k.startswith("_")] + attributes = [(xcom.key, xcom.value) for xcom in xcom_query if not xcom.key.startswith("_")] title = "XCom" return self.render_template( diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 3685f2323c..c7f10d2747 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -29,7 +29,7 @@ import time_machine from airflow import settings from airflow.exceptions import AirflowException -from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule +from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule, XCom from airflow.models.dagcode import DagCode from airflow.operators.bash import BashOperator from airflow.providers.celery.executors.celery_executor import CeleryExecutor @@ -42,7 +42,7 @@ from airflow.utils.types import DagRunType from airflow.www.views import TaskInstanceModelView from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user from tests.test_utils.config import conf_vars -from tests.test_utils.db import clear_db_runs +from tests.test_utils.db import clear_db_runs, clear_db_xcom from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) @@ -69,6 +69,13 @@ def init_dagruns(app, reset_dagruns): start_date=timezone.utcnow(), state=State.RUNNING, ) + XCom.set( + key="return_value", + value="{'x':1}", + task_id="runme_0", + dag_id="example_bash_operator", + execution_date=DEFAULT_DATE, + ) app.dag_bag.get_dag("example_subdag_operator").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, @@ -103,6 +110,7 @@ def init_dagruns(app, reset_dagruns): ) yield clear_db_runs() + clear_db_xcom() @pytest.fixture(scope="module") @@ -331,6 +339,17 @@ def test_views_get(admin_client, url, contents): check_content_in_response(content, resp) +def test_xcom_return_value_is_not_bytes(admin_client): + url = f"xcom?dag_id=example_bash_operator&task_id=runme_0&execution_date={DEFAULT_VAL}&map_index=-1" + resp = admin_client.get(url, follow_redirects=True) + # check that {"x":1} is in the response + content = "{'x':1}" + check_content_in_response(content, resp) + # check that b'{"x":1}' is not in the response + content = "b'"{\\'x\\':1}"'" + check_content_not_in_response(content, resp) + + def test_rendered_task_view(admin_client): url = f"task?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}" resp = admin_client.get(url, follow_redirects=True)
