This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c85dd5442de Fix xcom for system tests (#49337) c85dd5442de is described below commit c85dd5442de2d729abfb8e4c7117bca0cf0b9131 Author: VladaZakharova <ula...@google.com> AuthorDate: Wed Apr 16 11:03:45 2025 +0200 Fix xcom for system tests (#49337) Co-authored-by: Ulada Zakharava <vlada_zakhar...@epam.com> --- .../google/cloud/cloud_batch/example_cloud_batch.py | 8 ++++---- .../google/cloud/cloud_run/example_cloud_run.py | 19 +++++++++---------- .../system/google/cloud/gcs/example_gcs_to_gcs.py | 2 +- .../kubernetes_engine/example_kubernetes_engine.py | 2 +- .../example_kubernetes_engine_async.py | 2 +- .../google/cloud/translate/example_translate.py | 2 +- .../cloud/vertex_ai/example_vertex_ai_custom_job.py | 6 +++++- .../vertex_ai/example_vertex_ai_model_service.py | 6 +++++- 8 files changed, 27 insertions(+), 20 deletions(-) diff --git a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py index a10820ba41f..1b34f11f889 100644 --- a/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py +++ b/providers/google/tests/system/google/cloud/cloud_batch/example_cloud_batch.py @@ -55,7 +55,7 @@ def _assert_jobs(ti): job_names = ti.xcom_pull(task_ids=[list_jobs_task_name], key="return_value") job_names_str = "" if job_names and len(job_names) > 0: - for job in job_names[0]: + for job in job_names: job_names_str += job["name"].split("/")[-1] + " " assert job1_name in job_names_str assert job2_name in job_names_str @@ -63,9 +63,9 @@ def _assert_jobs(ti): def _assert_tasks(ti): tasks_names = ti.xcom_pull(task_ids=[list_tasks_task_name], key="return_value") - assert len(tasks_names[0]) == 2 - assert "tasks/0" in tasks_names[0][0]["name"] - assert "tasks/1" in tasks_names[0][1]["name"] + assert len(tasks_names) == 2 + assert "tasks/0" in tasks_names[0]["name"] + assert "tasks/1" in tasks_names[1]["name"] # [START howto_operator_batch_job_creation] diff --git a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py index 4aee6882171..1b7d83e858d 100644 --- a/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py +++ b/providers/google/tests/system/google/cloud/cloud_run/example_cloud_run.py @@ -69,30 +69,29 @@ clean2_task_name = "clean-job2" def _assert_executed_jobs_xcom(ti): job1_dicts = ti.xcom_pull(task_ids=[execute1_task_name], key="return_value") - assert job1_name in job1_dicts[0]["name"] + assert job1_name in job1_dicts["name"] job2_dicts = ti.xcom_pull(task_ids=[execute2_task_name], key="return_value") - assert job2_name in job2_dicts[0]["name"] + assert job2_name in job2_dicts["name"] job3_dicts = ti.xcom_pull(task_ids=[execute3_task_name], key="return_value") - assert job3_name in job3_dicts[0]["name"] + assert job3_name in job3_dicts["name"] def _assert_created_jobs_xcom(ti): job1_dicts = ti.xcom_pull(task_ids=[create1_task_name], key="return_value") - assert job1_name in job1_dicts[0]["name"] + assert job1_name in job1_dicts["name"] job2_dicts = ti.xcom_pull(task_ids=[create2_task_name], key="return_value") - assert job2_name in job2_dicts[0]["name"] + assert job2_name in job2_dicts["name"] job3_dicts = ti.xcom_pull(task_ids=[create3_task_name], key="return_value") - assert job3_name in job3_dicts[0]["name"] + assert job3_name in job3_dicts["name"] def _assert_updated_job(ti): job_dicts = ti.xcom_pull(task_ids=[update_job1_task_name], key="return_value") - job_dict = job_dicts[0] - assert job_dict["labels"]["somelabel"] == "label1" + assert job_dicts["labels"]["somelabel"] == "label1" def _assert_jobs(ti): @@ -101,7 +100,7 @@ def _assert_jobs(ti): job1_exists = False job2_exists = False - for job_dict in job_dicts[0]: + for job_dict in job_dicts: if job1_exists and job2_exists: break @@ -117,7 +116,7 @@ def _assert_jobs(ti): def _assert_one_job(ti): job_dicts = ti.xcom_pull(task_ids=[list_jobs_limit_task_name], key="return_value") - assert len(job_dicts[0]) == 1 + assert len(job_dicts) == 1 # [START howto_cloud_run_job_instance_creation] diff --git a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py index e3097ac7320..90fce68fa5e 100644 --- a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py +++ b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gcs.py @@ -58,7 +58,7 @@ PREFIX = f"{HOME}/data/{DAG_ID}_{ENV_ID}/" def _assert_copied_files_exist(ti): - objects = ti.xcom_pull(task_ids=["list_objects"], key="return_value")[0] + objects = ti.xcom_pull(task_ids=["list_objects"], key="return_value") assert PREFIX + OBJECT_1 in objects assert f"{PREFIX}subdir/{OBJECT_1}" in objects diff --git a/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py index 584a84ab437..c3d7aedc77c 100644 --- a/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py +++ b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py @@ -95,7 +95,7 @@ with DAG( # [START howto_operator_gke_xcom_result] pod_task_xcom_result = BashOperator( - bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"", + bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom') }}\"", task_id="pod_task_xcom_result", ) # [END howto_operator_gke_xcom_result] diff --git a/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py index 161f81e72a4..930a7e01a13 100644 --- a/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py +++ b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py @@ -97,7 +97,7 @@ with DAG( # [START howto_operator_gke_xcom_result_async] pod_task_xcom_result = BashOperator( - bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom_async')[0] }}\"", + bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom_async') }}\"", task_id="pod_task_xcom_result", ) # [END howto_operator_gke_xcom_result_async] diff --git a/providers/google/tests/system/google/cloud/translate/example_translate.py b/providers/google/tests/system/google/cloud/translate/example_translate.py index 44af8bf5d1d..25d6b13ad77 100644 --- a/providers/google/tests/system/google/cloud/translate/example_translate.py +++ b/providers/google/tests/system/google/cloud/translate/example_translate.py @@ -49,7 +49,7 @@ with DAG( # [END howto_operator_translate_text] # [START howto_operator_translate_access] translation_access = BashOperator( - task_id="access", bash_command="echo '{{ task_instance.xcom_pull(\"translate\")[0] }}'" + task_id="access", bash_command="echo '{{ task_instance.xcom_pull(\"translate\") }}'" ) # [END howto_operator_translate_access] product_set_create >> translation_access diff --git a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py index 22c3510d102..a3a3831fee8 100644 --- a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py +++ b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py @@ -76,7 +76,11 @@ REPLICA_COUNT = 1 # For example in Composer the correct path is `gcs/data/california_housing_training_script.py`. # Because `gcs/data/` is shared folder for Airflow's workers. IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", "")) -LOCAL_TRAINING_SCRIPT_PATH = "gcs/data/california_housing_training_script.py" if IS_COMPOSER else "" +LOCAL_TRAINING_SCRIPT_PATH = ( + "gcs/data/california_housing_training_script.py" + if IS_COMPOSER + else "california_housing_training_script.py" +) with DAG( diff --git a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py index e29cc4ea198..bfd4aef7549 100644 --- a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py +++ b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py @@ -91,7 +91,11 @@ CONTAINER_URI = "us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-2:latest" # For example in Composer the correct path is `gcs/data/california_housing_training_script.py`. # Because `gcs/data/` is shared folder for Airflow's workers. IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", "")) -LOCAL_TRAINING_SCRIPT_PATH = "gcs/data/california_housing_training_script.py" if IS_COMPOSER else "" +LOCAL_TRAINING_SCRIPT_PATH = ( + "gcs/data/california_housing_training_script.py" + if IS_COMPOSER + else "california_housing_training_script.py" +) MODEL_OUTPUT_CONFIG = { "artifact_destination": {