This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd2fbb1fe7b Update xcom_pull usage for google system tests (#51527)
bd2fbb1fe7b is described below

commit bd2fbb1fe7b240258ba26f0253feabff6452ec9f
Author: Maksim <[email protected]>
AuthorDate: Mon Jun 9 15:07:48 2025 +0200

    Update xcom_pull usage for google system tests (#51527)
---
 .../cloud/cloud_batch/example_cloud_batch.py       | 11 ++--------
 .../google/cloud/cloud_run/example_cloud_run.py    | 25 ++++++++--------------
 .../system/google/cloud/gcs/example_gcs_to_gcs.py  |  9 +-------
 3 files changed, 12 insertions(+), 33 deletions(-)

diff --git 
a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py 
b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
index bc3af3d0f09..13ff88d3453 100644
--- 
a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
+++ 
b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
@@ -27,7 +27,6 @@ from datetime import datetime
 from google.cloud import batch_v1
 
 from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.google.cloud.operators.cloud_batch import (
     CloudBatchDeleteJobOperator,
     CloudBatchListJobsOperator,
@@ -52,14 +51,8 @@ list_jobs_task_name = "list-jobs"
 list_tasks_task_name = "list-tasks"
 
 
-def _unwrap_xcom(result):
-    if AIRFLOW_V_3_0_PLUS:
-        return result
-    return result[0]
-
-
 def _assert_jobs(ti):
-    job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_task_name], 
key="return_value"))
+    job_list = ti.xcom_pull(list_jobs_task_name)
     job_names_str = ""
 
     if job_list:
@@ -71,7 +64,7 @@ def _assert_jobs(ti):
 
 
 def _assert_tasks(ti):
-    task_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_tasks_task_name], 
key="return_value"))
+    task_list = ti.xcom_pull(list_tasks_task_name)
 
     assert len(task_list) == 2
     assert "tasks/0" in task_list[0]["name"]
diff --git 
a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py 
b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
index 7e7c8aa9095..4b0b60abe10 100644
--- a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
+++ b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
@@ -27,7 +27,6 @@ from google.cloud.run_v2 import Job
 from google.cloud.run_v2.types import k8s_min
 
 from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.google.cloud.operators.cloud_run import (
     CloudRunCreateJobOperator,
     CloudRunDeleteJobOperator,
@@ -68,41 +67,35 @@ clean1_task_name = "clean-job1"
 clean2_task_name = "clean-job2"
 
 
-def _unwrap_xcom(result):
-    if AIRFLOW_V_3_0_PLUS:
-        return result
-    return result[0]
-
-
 def _assert_executed_jobs_xcom(ti):
-    job1_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute1_task_name], 
key="return_value"))
+    job1_dict = ti.xcom_pull(execute1_task_name)
     assert job1_name in job1_dict["name"]
 
-    job2_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute2_task_name], 
key="return_value"))
+    job2_dict = ti.xcom_pull(execute2_task_name)
     assert job2_name in job2_dict["name"]
 
-    job3_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute3_task_name], 
key="return_value"))
+    job3_dict = ti.xcom_pull(execute3_task_name)
     assert job3_name in job3_dict["name"]
 
 
 def _assert_created_jobs_xcom(ti):
-    job1_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create1_task_name], 
key="return_value"))
+    job1_dict = ti.xcom_pull(create1_task_name)
     assert job1_name in job1_dict["name"]
 
-    job2_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create2_task_name], 
key="return_value"))
+    job2_dict = ti.xcom_pull(create2_task_name)
     assert job2_name in job2_dict["name"]
 
-    job3_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create3_task_name], 
key="return_value"))
+    job3_dict = ti.xcom_pull(create3_task_name)
     assert job3_name in job3_dict["name"]
 
 
 def _assert_updated_job(ti):
-    job_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[update_job1_task_name], 
key="return_value"))
+    job_dict = ti.xcom_pull(update_job1_task_name)
     assert job_dict["labels"]["somelabel"] == "label1"
 
 
 def _assert_jobs(ti):
-    job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_task_name], 
key="return_value"))
+    job_list = ti.xcom_pull(list_jobs_task_name)
 
     job1_exists = any(job1_name in job["name"] for job in job_list)
     job2_exists = any(job2_name in job["name"] for job in job_list)
@@ -112,7 +105,7 @@ def _assert_jobs(ti):
 
 
 def _assert_one_job(ti):
-    job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_limit_task_name], 
key="return_value"))
+    job_list = ti.xcom_pull(list_jobs_limit_task_name)
     assert len(job_list) == 1
 
 
diff --git 
a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py 
b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
index e5890a2c9cc..199edc5fad2 100644
--- a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
+++ b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
@@ -30,7 +30,6 @@ from pathlib import Path
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.google.cloud.operators.gcs import (
     GCSCreateBucketOperator,
     GCSDeleteBucketOperator,
@@ -58,14 +57,8 @@ HOME = "/home/airflow/gcs"
 PREFIX = f"{HOME}/data/{DAG_ID}_{ENV_ID}/"
 
 
-def _unwrap_xcom(result):
-    if AIRFLOW_V_3_0_PLUS:
-        return result
-    return result[0]
-
-
 def _assert_copied_files_exist(ti):
-    objects = _unwrap_xcom(ti.xcom_pull(task_ids=["list_objects"], 
key="return_value"))
+    objects = ti.xcom_pull("list_objects")
 
     assert PREFIX + OBJECT_1 in objects
     assert f"{PREFIX}subdir/{OBJECT_1}" in objects

Reply via email to