This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d1455c892fe Fix AIR004* in multiple example DAGs (#62529)
d1455c892fe is described below
commit d1455c892fedf1256ed88e389bd0ededad8ad4a4
Author: Dev-iL <[email protected]>
AuthorDate: Fri Mar 13 21:20:04 2026 +0200
Fix AIR004* in multiple example DAGs (#62529)
---
providers/amazon/tests/system/amazon/aws/example_ssm.py | 10 +++++-----
.../src/airflow/providers/databricks/utils/databricks.py | 9 +++++++--
.../tests/system/databricks/example_databricks.py | 4 +---
.../tests/unit/databricks/utils/test_databricks.py | 13 +++++++++++++
.../system/google/cloud/dataplex/example_dataplex_dp.py | 2 +-
.../system/google/cloud/dataplex/example_dataplex_dq.py | 2 +-
.../google/cloud/vision/example_vision_autogenerated.py | 14 +++++++-------
7 files changed, 35 insertions(+), 19 deletions(-)
diff --git a/providers/amazon/tests/system/amazon/aws/example_ssm.py
b/providers/amazon/tests/system/amazon/aws/example_ssm.py
index 82304a4c0c8..98d7f3fe7d5 100644
--- a/providers/amazon/tests/system/amazon/aws/example_ssm.py
+++ b/providers/amazon/tests/system/amazon/aws/example_ssm.py
@@ -211,14 +211,14 @@ with DAG(
# [START howto_sensor_run_command]
await_run_command = SsmRunCommandCompletedSensor(
- task_id="await_run_command", command_id="{{
ti.xcom_pull(task_ids='run_command') }}"
+ task_id="await_run_command", command_id=run_command.output
)
# [END howto_sensor_run_command]
# [START howto_operator_get_command_invocation]
get_command_output = SsmGetCommandInvocationOperator(
task_id="get_command_output",
- command_id="{{ ti.xcom_pull(task_ids='run_command') }}",
+ command_id=run_command.output,
instance_id=instance_id,
)
# [END howto_operator_get_command_invocation]
@@ -237,7 +237,7 @@ with DAG(
wait_command_async = SsmRunCommandCompletedSensor(
task_id="wait_command_async",
- command_id="{{ ti.xcom_pull(task_ids='run_command_async') }}",
+ command_id=run_command_async.output,
fail_on_nonzero_exit=False,
)
# [END howto_operator_ssm_enhanced_async]
@@ -258,7 +258,7 @@ with DAG(
# [START howto_operator_ssm_exit_code_routing]
get_exit_code_output = SsmGetCommandInvocationOperator(
task_id="get_exit_code_output",
- command_id="{{ ti.xcom_pull(task_ids='run_command_async') }}",
+ command_id=run_command_async.output,
instance_id=instance_id,
)
@@ -292,7 +292,7 @@ with DAG(
wait_command_traditional = SsmRunCommandCompletedSensor(
task_id="wait_command_traditional",
- command_id="{{ ti.xcom_pull(task_ids='run_command_traditional') }}",
+ command_id=run_command_traditional.output,
)
# [END howto_operator_ssm_traditional]
diff --git
a/providers/databricks/src/airflow/providers/databricks/utils/databricks.py
b/providers/databricks/src/airflow/providers/databricks/utils/databricks.py
index d891953fb24..05b6b17710e 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/databricks.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/databricks.py
@@ -17,11 +17,11 @@
# under the License.
from __future__ import annotations
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, XComArg
from airflow.providers.databricks.hooks.databricks import DatabricksHook,
RunState
-def normalise_json_content(content, json_path: str = "json") -> str | bool |
list | dict:
+def normalise_json_content(content, json_path: str = "json") -> str | bool |
list | dict | XComArg:
"""
Normalize content or all values of content if it is a dict to a string.
@@ -32,8 +32,13 @@ def normalise_json_content(content, json_path: str = "json")
-> str | bool | lis
The only one exception is when we have boolean values, they can not be
converted
to string type because databricks does not understand 'True' or 'False'
values.
+
+ XComArg instances are passed through as-is because they are resolved at
runtime
+ via template rendering.
"""
normalise = normalise_json_content
+ if isinstance(content, XComArg):
+ return content
if isinstance(content, (str, bool)):
return content
if isinstance(content, (int, float)):
diff --git a/providers/databricks/tests/system/databricks/example_databricks.py
b/providers/databricks/tests/system/databricks/example_databricks.py
index 39aaa2891d6..5dca4684eaf 100644
--- a/providers/databricks/tests/system/databricks/example_databricks.py
+++ b/providers/databricks/tests/system/databricks/example_databricks.py
@@ -115,9 +115,7 @@ with DAG(
# [START howto_operator_databricks_run_now]
# Example of using the DatabricksRunNowOperator after creating a job with
DatabricksCreateJobsOperator.
- run_now = DatabricksRunNowOperator(
- task_id="run_now", job_id="{{
ti.xcom_pull(task_ids='jobs_create_named') }}"
- )
+ run_now = DatabricksRunNowOperator(task_id="run_now",
job_id=jobs_create_named.output)
jobs_create_named >> run_now
# [END howto_operator_databricks_run_now]
diff --git
a/providers/databricks/tests/unit/databricks/utils/test_databricks.py
b/providers/databricks/tests/unit/databricks/utils/test_databricks.py
index e6aee024a51..33716879713 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_databricks.py
@@ -68,6 +68,19 @@ class TestDatabricksOperatorSharedFunctions:
}
assert normalise_json_content(test_json) == expected
+ def test_normalise_json_content_with_xcom_arg(self):
+ """XComArg values should be passed through since they resolve at
runtime."""
+ from airflow.providers.common.compat.sdk import DAG, BaseOperator
+
+ with DAG(dag_id="test_dag"):
+ op = BaseOperator(task_id="test_task")
+ xcom_arg = op.output
+
+ test_json = {"job_id": xcom_arg, "name": "test"}
+ result = normalise_json_content(test_json)
+ assert result["job_id"] is xcom_arg
+ assert result["name"] == "test"
+
def test_validate_trigger_event_success(self):
event = {
"run_id": RUN_ID,
diff --git
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
index 0425c528e71..b81f3a56ec7 100644
--- a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
+++ b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
@@ -240,7 +240,7 @@ with DAG(
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
- job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
+ job_id=run_data_scan_async.output,
)
# [END howto_dataplex_data_scan_job_state_sensor]
# [START howto_dataplex_get_data_profile_job_operator]
diff --git
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
index 4491208e114..8fde0840c34 100644
--- a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
+++ b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
@@ -264,7 +264,7 @@ with DAG(
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
- job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
+ job_id=run_data_scan_async.output,
)
# [END howto_dataplex_data_scan_job_state_sensor]
# [START howto_dataplex_get_data_quality_job_operator]
diff --git
a/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
b/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
index a556d760766..9196c05ac5a 100644
---
a/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
+++
b/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
@@ -181,7 +181,7 @@ with DAG(
# [START howto_operator_vision_product_get]
product_get = CloudVisionGetProductOperator(
location=LOCATION,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
task_id="product_get",
)
# [END howto_operator_vision_product_get]
@@ -189,7 +189,7 @@ with DAG(
# [START howto_operator_vision_product_update]
product_update = CloudVisionUpdateProductOperator(
location=LOCATION,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
product=Product(display_name="My Product 2", description="My updated
description"),
task_id="product_update",
)
@@ -198,7 +198,7 @@ with DAG(
# [START howto_operator_vision_product_delete]
product_delete = CloudVisionDeleteProductOperator(
location=LOCATION,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
task_id="product_delete",
)
# [END howto_operator_vision_product_delete]
@@ -207,7 +207,7 @@ with DAG(
reference_image_create = CloudVisionCreateReferenceImageOperator(
location=LOCATION,
reference_image=reference_image,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
@@ -218,7 +218,7 @@ with DAG(
# [START howto_operator_vision_reference_image_delete]
reference_image_delete = CloudVisionDeleteReferenceImageOperator(
location=LOCATION,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
@@ -230,7 +230,7 @@ with DAG(
add_product_to_product_set = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
retry=Retry(maximum=10.0),
timeout=5,
task_id="add_product_to_product_set",
@@ -241,7 +241,7 @@ with DAG(
remove_product_from_product_set =
CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
- product_id="{{ task_instance.xcom_pull('product_create') }}",
+ product_id=product_create.output,
retry=Retry(maximum=10.0),
timeout=5,
task_id="remove_product_from_product_set",