This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7463380047347e8fe98202d3b272a1b60e8db5b9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Aug 8 15:50:22 2023 +0100

    Fix xcom view returning bytes as xcom value (#33202)
    
    (cherry picked from commit 36c2735ca48d4c2e2d239c210d3732fd8918fed2)
---
 airflow/www/views.py                |  6 +++---
 tests/www/views/test_views_tasks.py | 23 +++++++++++++++++++++--
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 18e062a487..3faedd3d8b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1926,15 +1926,15 @@ class Airflow(AirflowBaseView):
             flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the 
moment", "error")
             return redirect(url_for("Airflow.index"))
 
-        xcom_query = session.execute(
-            select(XCom.key, XCom.value).where(
+        xcom_query = session.scalars(
+            select(XCom).where(
                 XCom.dag_id == dag_id,
                 XCom.task_id == task_id,
                 XCom.execution_date == dttm,
                 XCom.map_index == map_index,
             )
         )
-        attributes = [(k, v) for k, v in xcom_query if not k.startswith("_")]
+        attributes = [(xcom.key, xcom.value) for xcom in xcom_query if not 
xcom.key.startswith("_")]
 
         title = "XCom"
         return self.render_template(
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index 3685f2323c..c7f10d2747 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -29,7 +29,7 @@ import time_machine
 
 from airflow import settings
 from airflow.exceptions import AirflowException
-from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, 
TaskReschedule
+from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, 
TaskReschedule, XCom
 from airflow.models.dagcode import DagCode
 from airflow.operators.bash import BashOperator
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
@@ -42,7 +42,7 @@ from airflow.utils.types import DagRunType
 from airflow.www.views import TaskInstanceModelView
 from tests.test_utils.api_connexion_utils import create_user, delete_roles, 
delete_user
 from tests.test_utils.config import conf_vars
-from tests.test_utils.db import clear_db_runs
+from tests.test_utils.db import clear_db_runs, clear_db_xcom
 from tests.test_utils.www import check_content_in_response, 
check_content_not_in_response, client_with_login
 
 DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, 
microsecond=0)
@@ -69,6 +69,13 @@ def init_dagruns(app, reset_dagruns):
             start_date=timezone.utcnow(),
             state=State.RUNNING,
         )
+        XCom.set(
+            key="return_value",
+            value="{'x':1}",
+            task_id="runme_0",
+            dag_id="example_bash_operator",
+            execution_date=DEFAULT_DATE,
+        )
         app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
             run_id=DEFAULT_DAGRUN,
             run_type=DagRunType.SCHEDULED,
@@ -103,6 +110,7 @@ def init_dagruns(app, reset_dagruns):
         )
     yield
     clear_db_runs()
+    clear_db_xcom()
 
 
 @pytest.fixture(scope="module")
@@ -331,6 +339,17 @@ def test_views_get(admin_client, url, contents):
         check_content_in_response(content, resp)
 
 
+def test_xcom_return_value_is_not_bytes(admin_client):
+    url = 
f"xcom?dag_id=example_bash_operator&task_id=runme_0&execution_date={DEFAULT_VAL}&map_index=-1"
+    resp = admin_client.get(url, follow_redirects=True)
+    # check that {"x":1} is in the response
+    content = "{&#39;x&#39;:1}"
+    check_content_in_response(content, resp)
+    # check that b'{"x":1}' is not in the response
+    content = "b&#39;&#34;{\\&#39;x\\&#39;:1}&#34;&#39;"
+    check_content_not_in_response(content, resp)
+
+
 def test_rendered_task_view(admin_client):
     url = 
f"task?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}"
     resp = admin_client.get(url, follow_redirects=True)

Reply via email to