This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd2fbb1fe7b Update xcom_pull usage for google system tests (#51527)
bd2fbb1fe7b is described below
commit bd2fbb1fe7b240258ba26f0253feabff6452ec9f
Author: Maksim <[email protected]>
AuthorDate: Mon Jun 9 15:07:48 2025 +0200
Update xcom_pull usage for google system tests (#51527)
---
.../cloud/cloud_batch/example_cloud_batch.py | 11 ++--------
.../google/cloud/cloud_run/example_cloud_run.py | 25 ++++++++--------------
.../system/google/cloud/gcs/example_gcs_to_gcs.py | 9 +-------
3 files changed, 12 insertions(+), 33 deletions(-)
diff --git
a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
index bc3af3d0f09..13ff88d3453 100644
---
a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
+++
b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py
@@ -27,7 +27,6 @@ from datetime import datetime
from google.cloud import batch_v1
from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.google.cloud.operators.cloud_batch import (
CloudBatchDeleteJobOperator,
CloudBatchListJobsOperator,
@@ -52,14 +51,8 @@ list_jobs_task_name = "list-jobs"
list_tasks_task_name = "list-tasks"
-def _unwrap_xcom(result):
- if AIRFLOW_V_3_0_PLUS:
- return result
- return result[0]
-
-
def _assert_jobs(ti):
- job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_task_name],
key="return_value"))
+ job_list = ti.xcom_pull(list_jobs_task_name)
job_names_str = ""
if job_list:
@@ -71,7 +64,7 @@ def _assert_jobs(ti):
def _assert_tasks(ti):
- task_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_tasks_task_name],
key="return_value"))
+ task_list = ti.xcom_pull(list_tasks_task_name)
assert len(task_list) == 2
assert "tasks/0" in task_list[0]["name"]
diff --git
a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
index 7e7c8aa9095..4b0b60abe10 100644
--- a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
+++ b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py
@@ -27,7 +27,6 @@ from google.cloud.run_v2 import Job
from google.cloud.run_v2.types import k8s_min
from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.google.cloud.operators.cloud_run import (
CloudRunCreateJobOperator,
CloudRunDeleteJobOperator,
@@ -68,41 +67,35 @@ clean1_task_name = "clean-job1"
clean2_task_name = "clean-job2"
-def _unwrap_xcom(result):
- if AIRFLOW_V_3_0_PLUS:
- return result
- return result[0]
-
-
def _assert_executed_jobs_xcom(ti):
- job1_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute1_task_name],
key="return_value"))
+ job1_dict = ti.xcom_pull(execute1_task_name)
assert job1_name in job1_dict["name"]
- job2_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute2_task_name],
key="return_value"))
+ job2_dict = ti.xcom_pull(execute2_task_name)
assert job2_name in job2_dict["name"]
- job3_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[execute3_task_name],
key="return_value"))
+ job3_dict = ti.xcom_pull(execute3_task_name)
assert job3_name in job3_dict["name"]
def _assert_created_jobs_xcom(ti):
- job1_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create1_task_name],
key="return_value"))
+ job1_dict = ti.xcom_pull(create1_task_name)
assert job1_name in job1_dict["name"]
- job2_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create2_task_name],
key="return_value"))
+ job2_dict = ti.xcom_pull(create2_task_name)
assert job2_name in job2_dict["name"]
- job3_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[create3_task_name],
key="return_value"))
+ job3_dict = ti.xcom_pull(create3_task_name)
assert job3_name in job3_dict["name"]
def _assert_updated_job(ti):
- job_dict = _unwrap_xcom(ti.xcom_pull(task_ids=[update_job1_task_name],
key="return_value"))
+ job_dict = ti.xcom_pull(update_job1_task_name)
assert job_dict["labels"]["somelabel"] == "label1"
def _assert_jobs(ti):
- job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_task_name],
key="return_value"))
+ job_list = ti.xcom_pull(list_jobs_task_name)
job1_exists = any(job1_name in job["name"] for job in job_list)
job2_exists = any(job2_name in job["name"] for job in job_list)
@@ -112,7 +105,7 @@ def _assert_jobs(ti):
def _assert_one_job(ti):
- job_list = _unwrap_xcom(ti.xcom_pull(task_ids=[list_jobs_limit_task_name],
key="return_value"))
+ job_list = ti.xcom_pull(list_jobs_limit_task_name)
assert len(job_list) == 1
diff --git
a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
index e5890a2c9cc..199edc5fad2 100644
--- a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
+++ b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py
@@ -30,7 +30,6 @@ from pathlib import Path
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
@@ -58,14 +57,8 @@ HOME = "/home/airflow/gcs"
PREFIX = f"{HOME}/data/{DAG_ID}_{ENV_ID}/"
-def _unwrap_xcom(result):
- if AIRFLOW_V_3_0_PLUS:
- return result
- return result[0]
-
-
def _assert_copied_files_exist(ti):
- objects = _unwrap_xcom(ti.xcom_pull(task_ids=["list_objects"],
key="return_value"))
+ objects = ti.xcom_pull("list_objects")
assert PREFIX + OBJECT_1 in objects
assert f"{PREFIX}subdir/{OBJECT_1}" in objects