This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5a1dd8622c Add config to disable the 'deserialize' XCom API flag
(#32176)
5a1dd8622c is described below
commit 5a1dd8622c2108f07bab99038fd14836b75cd7c2
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Jul 10 09:48:40 2023 +0800
Add config to disable the 'deserialize' XCom API flag (#32176)
---
airflow/api_connexion/endpoints/xcom_endpoint.py | 5 +-
airflow/config_templates/config.yml | 9 ++++
airflow/config_templates/default_airflow.cfg | 5 ++
newsfragments/32176.significant.rst | 10 ++++
.../api_connexion/endpoints/test_xcom_endpoint.py | 55 ++++++++++++++++++----
5 files changed, 74 insertions(+), 10 deletions(-)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 830cedb51c..81c0fe8f12 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -23,12 +23,13 @@ from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session
from airflow.api_connexion import security
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import XComCollection,
xcom_collection_schema, xcom_schema
from airflow.api_connexion.types import APIResponse
from airflow.models import DagRun as DR, XCom
from airflow.security import permissions
+from airflow.settings import conf
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.session import NEW_SESSION, provide_session
@@ -93,6 +94,8 @@ def get_xcom_entry(
) -> APIResponse:
"""Get an XCom entry."""
if deserialize:
+ if not conf.getboolean("api", "enable_xcom_deserialize_support",
fallback=False):
+ raise BadRequest(detail="XCom deserialization is disabled in
configuration.")
query = select(XCom, XCom.value)
else:
query = select(XCom)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 273f2a36c7..5336a88f4f 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1198,6 +1198,15 @@ api:
version_added: 2.2.0
example: ~
default: ""
+ enable_xcom_deserialize_support:
+ description: |
+ Indicates whether the *xcomEntries* endpoint supports the *deserialize*
+ flag. If set to False, setting this flag in a request would result in a
+ 400 Bad Request error.
+ type: boolean
+ version_added: 2.7.0
+ example: ~
+ default: "False"
lineage:
description: ~
options:
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 439944022f..7cae11065b 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -654,6 +654,11 @@ access_control_allow_methods =
# Separate URLs with space.
access_control_allow_origins =
+# Indicates whether the *xcomEntries* endpoint supports the *deserialize*
+# flag. If set to False, setting this flag in a request would result in a
+# 400 Bad Request error.
+enable_xcom_deserialize_support = False
+
[lineage]
# what lineage backend to use
backend =
diff --git a/newsfragments/32176.significant.rst
b/newsfragments/32176.significant.rst
new file mode 100644
index 0000000000..492d5f955d
--- /dev/null
+++ b/newsfragments/32176.significant.rst
@@ -0,0 +1,10 @@
+The ``xcomEntries`` API disables support for the ``deserialize`` flag by
default
+
+For security reasons, the ``/dags/*/dagRuns/*/taskInstances/*/xcomEntries/*``
+API endpoint now disables the ``deserialize`` option to deserialize arbitrary
+XCom values in the webserver. For backward compatibility, server admins may set
+the ``[api] enable_xcom_deserialize_support`` config to *True* to enable the
+flag and restore backward compatibility.
+
+However, it is strongly advised to **not** enable the feature, and perform
+deserialization at the client side instead.
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 45a5894ec6..7cd82fdaa3 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -183,24 +183,61 @@ class TestGetXComEntry(TestXComEndpoint):
)
@pytest.mark.parametrize(
- "query, expected_value",
+ "allowed, query, expected_status_or_value",
[
- pytest.param("?deserialize=true", "real deserialized TEST_VALUE",
id="true"),
- pytest.param("?deserialize=false", "orm deserialized TEST_VALUE",
id="false"),
- pytest.param("", "orm deserialized TEST_VALUE", id="default"),
+ pytest.param(
+ True,
+ "?deserialize=true",
+ "real deserialized TEST_VALUE",
+ id="true",
+ ),
+ pytest.param(
+ False,
+ "?deserialize=true",
+ 400,
+ id="disallowed",
+ ),
+ pytest.param(
+ True,
+ "?deserialize=false",
+ "orm deserialized TEST_VALUE",
+ id="false-irrelevant",
+ ),
+ pytest.param(
+ False,
+ "?deserialize=false",
+ "orm deserialized TEST_VALUE",
+ id="false",
+ ),
+ pytest.param(
+ True,
+ "",
+ "orm deserialized TEST_VALUE",
+ id="default-irrelevant",
+ ),
+ pytest.param(
+ False,
+ "",
+ "orm deserialized TEST_VALUE",
+ id="default",
+ ),
],
)
@conf_vars({("core", "xcom_backend"):
"tests.api_connexion.endpoints.test_xcom_endpoint.CustomXCom"})
- def test_custom_xcom_deserialize(self, query, expected_value):
+ def test_custom_xcom_deserialize(self, allowed: bool, query: str,
expected_status_or_value: int | str):
XCom = resolve_xcom_backend()
self._create_xcom_entry("dag", "run", utcnow(), "task", "key",
backend=XCom)
url =
f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/key{query}"
with mock.patch("airflow.api_connexion.endpoints.xcom_endpoint.XCom",
XCom):
- response = self.client.get(url, environ_overrides={"REMOTE_USER":
"test"})
-
- assert response.status_code == 200
- assert response.json["value"] == expected_value
+ with conf_vars({("api", "enable_xcom_deserialize_support"):
str(allowed)}):
+ response = self.client.get(url,
environ_overrides={"REMOTE_USER": "test"})
+
+ if isinstance(expected_status_or_value, int):
+ assert response.status_code == expected_status_or_value
+ else:
+ assert response.status_code == 200
+ assert response.json["value"] == expected_status_or_value
class TestGetXComEntries(TestXComEndpoint):