This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 9c48e6f3a4 Do not attempt to provide not stringified objects to UI via
xcom if pickling is active (#42388) (#42486)
9c48e6f3a4 is described below
commit 9c48e6f3a4b0010ff5cd3f382c2d09425857d15d
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu Sep 26 06:34:24 2024 +0200
Do not attempt to provide not stringified objects to UI via xcom if
pickling is active (#42388) (#42486)
* Do not attempt to provide not stringified objects to UI via xcom if
pickling is active
* Add pytest
(cherry picked from commit f9877af256acd1b64fcf91aefdf14ab9389b0bbc)
---
airflow/api_connexion/endpoints/xcom_endpoint.py | 2 +-
airflow/api_connexion/openapi/v1.yaml | 2 ++
.../api_connexion/endpoints/test_xcom_endpoint.py | 30 ++++++++++++++++++++++
3 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 59fa9f5aca..5ba0ffa715 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -125,7 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub
- if stringify:
+ if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return xcom_schema_string.dump(item)
return xcom_schema_native.dump(item)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index d91186ff9d..575b83412c 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2039,6 +2039,8 @@ paths:
If set to true (default) the Any value will be returned as string,
e.g. a Python representation
of a dict. If set to false it will return the raw data as dict,
list, string or whatever was stored.
+ This parameter is not meaningful when using XCom pickling, then it
is always returned as string.
+
*New in version 2.10.0*
responses:
"200":
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 9f2d652500..7a51714c5b 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -174,6 +174,36 @@ class TestGetXComEntry(TestXComEndpoint):
"value": {"key": "value"},
}
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_should_respond_200_native_for_pickled(self):
+ dag_id = "test-dag-id"
+ task_id = "test-task-id"
+ execution_date = "2005-04-02T00:00:00+00:00"
+ xcom_key = "test-xcom-key"
+ execution_date_parsed = parse_execution_date(execution_date)
+ run_id = DagRun.generate_run_id(DagRunType.MANUAL,
execution_date_parsed)
+ value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG
(SEQ_WF: 138898)", None): 82359}
+ self._create_xcom_entry(
+ dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key":
value_non_serializable_key}
+ )
+ response = self.client.get(
+
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert 200 == response.status_code
+
+ current_data = response.json
+ current_data["timestamp"] = "TIMESTAMP"
+ assert current_data == {
+ "dag_id": dag_id,
+ "execution_date": execution_date,
+ "key": xcom_key,
+ "task_id": task_id,
+ "map_index": -1,
+ "timestamp": "TIMESTAMP",
+ "value": f"{{'key': {str(value_non_serializable_key)}}}",
+ }
+
def test_should_raise_404_for_non_existent_xcom(self):
dag_id = "test-dag-id"
task_id = "test-task-id"